package org.apache.hudi.client.functional;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.ClusteringTestUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.class */
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
    private static final Map<String, String> STRATEGY_PARAMS = new HashMap<String, String>() { // from class: org.apache.hudi.client.functional.TestHoodieClientOnCopyOnWriteStorage.1
        {
            put("sortColumn", SparkDatasetTestUtils.RECORD_KEY_FIELD_NAME);
        }
    };
    private HoodieTestTable testTable;
    private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from <TABLE_NAME>";

    /* loaded from: input_file:org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage$FailingPreCommitValidator.class */
    public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
        public FailingPreCommitValidator(HoodieSparkTable hoodieSparkTable, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig) {
            super(hoodieSparkTable, hoodieEngineContext, hoodieWriteConfig);
        }

        protected void validateRecordsBeforeAndAfter(Dataset<Row> dataset, Dataset<Row> dataset2, Set<String> set) {
            throw new HoodieValidationException("simulate failure");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> smallInsertHandlingParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true}, new Boolean[]{false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> populateMetaFieldsParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true}, new Boolean[]{false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true, true}, new Boolean[]{false, true}, new Boolean[]{true, false}, new Boolean[]{false, false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    private static Stream<Arguments> rollbackFailedCommitsParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{HoodieFailedWritesCleaningPolicy.LAZY, true}), Arguments.of(new Object[]{HoodieFailedWritesCleaningPolicy.LAZY, false}), Arguments.of(new Object[]{HoodieFailedWritesCleaningPolicy.NEVER, true}), Arguments.of(new Object[]{HoodieFailedWritesCleaningPolicy.NEVER, false})});
    }

    private static Stream<Arguments> rollbackAfterConsistencyCheckFailureParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{true, true}), Arguments.of(new Object[]{true, false}), Arguments.of(new Object[]{false, true}), Arguments.of(new Object[]{false, false})});
    }

    @BeforeEach
    public void setUpTestTable() {
        this.testTable = HoodieSparkWriteableTestTable.of(this.metaClient);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnInsert(boolean z) throws Exception {
        testAutoCommit((v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnInsertPrepped(boolean z) throws Exception {
        testAutoCommit((v0, v1, v2) -> {
            return v0.insertPreppedRecords(v1, v2);
        }, true, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnUpsert(boolean z) throws Exception {
        testAutoCommit((v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnUpsertPrepped(boolean z) throws Exception {
        testAutoCommit((v0, v1, v2) -> {
            return v0.upsertPreppedRecords(v1, v2);
        }, true, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnBulkInsert(boolean z) throws Exception {
        testAutoCommit((v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testAutoCommitOnBulkInsertPrepped(boolean z) throws Exception {
        testAutoCommit((sparkRDDWriteClient, javaRDD, str) -> {
            return sparkRDDWriteClient.bulkInsertPreppedRecords(javaRDD, str, Option.empty());
        }, true, z);
    }

    private void testAutoCommit(HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3, boolean z, boolean z2) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, z2);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        Throwable th = null;
        try {
            try {
                JavaRDD<WriteStatus> insertFirstBatch = insertFirstBatch(withAutoCommit.build(), hoodieWriteClient, "001", "000", 200, function3, z, false, 200);
                Assertions.assertFalse(this.testTable.commitExists("001"), "If Autocommit is false, then commit should not be made automatically");
                Assertions.assertTrue(hoodieWriteClient.commit("001", insertFirstBatch), "Commit should succeed");
                Assertions.assertTrue(this.testTable.commitExists("001"), "After explicit commit, commit file should be created");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPreCommitValidatorsOnInsert() throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withAutoCommit(true).withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#200").build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3 = (sparkRDDWriteClient, javaRDD, str) -> {
                    return sparkRDDWriteClient.bulkInsert(javaRDD, str, Option.empty());
                };
                String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                insertFirstBatch(build, hoodieWriteClient, createNewInstantTime, "000", 200, function3, false, false, 200);
                Assertions.assertTrue(this.testTable.commitExists(createNewInstantTime));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPreCommitValidationFailureOnInsert() throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#500").build()).build();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        try {
            SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
            Throwable th = null;
            try {
                insertFirstBatch(build, hoodieWriteClient, createNewInstantTime, "000", 200, (sparkRDDWriteClient, javaRDD, str) -> {
                    return sparkRDDWriteClient.bulkInsert(javaRDD, str, Option.empty());
                }, false, false, 200);
                Assertions.fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
                if (hoodieWriteClient != null) {
                    if (0 != 0) {
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hoodieWriteClient.close();
                    }
                }
            } finally {
            }
        } catch (HoodieInsertException e) {
            if (!(e.getCause() instanceof HoodieValidationException)) {
                throw e;
            }
        }
        Assertions.assertFalse(this.testTable.commitExists(createNewInstantTime));
    }

    @Test
    public void testPreCommitValidationWithMultipleInflights() throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()).withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#500").build()).build();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        try {
            insertWithConfig(build, 200, createNewInstantTime);
            Assertions.fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
        } catch (HoodieInsertException e) {
            if (!(e.getCause() instanceof HoodieValidationException)) {
                throw e;
            }
        }
        Assertions.assertFalse(this.testTable.commitExists(createNewInstantTime));
        Assertions.assertTrue(this.testTable.inflightCommitExists(createNewInstantTime));
        HoodieWriteConfig build2 = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()).withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#300").build()).build();
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        insertWithConfig(build2, 300, createNewInstantTime2);
        Assertions.assertTrue(this.testTable.inflightCommitExists(createNewInstantTime));
        Assertions.assertTrue(this.testTable.commitExists(createNewInstantTime2));
    }

    private void insertWithConfig(HoodieWriteConfig hoodieWriteConfig, int i, String str) throws Exception {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(hoodieWriteConfig);
        Throwable th = null;
        try {
            insertFirstBatch(hoodieWriteConfig, hoodieWriteClient, str, "000", i, (sparkRDDWriteClient, javaRDD, str2) -> {
                return sparkRDDWriteClient.bulkInsert(javaRDD, str2, Option.empty());
            }, false, false, i);
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeduplicationOnInsert(boolean z) throws Exception {
        testDeduplication((v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeduplicationOnBulkInsert(boolean z) throws Exception {
        testDeduplication((v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeduplicationOnUpsert(boolean z) throws Exception {
        testDeduplication((v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, z);
    }

    private void testDeduplication(HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3, boolean z) throws Exception {
        String uuid = UUID.randomUUID().toString();
        HoodieKey hoodieKey = new HoodieKey(uuid, "2018-01-01");
        HoodieRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieKey, this.dataGen.generateRandomValue(hoodieKey, "001"));
        HoodieKey hoodieKey2 = new HoodieKey(uuid, "2018-02-01");
        HoodieRecord hoodieAvroRecord2 = new HoodieAvroRecord(hoodieKey2, this.dataGen.generateRandomValue(hoodieKey2, "001"));
        HoodieRecord hoodieAvroRecord3 = new HoodieAvroRecord(hoodieKey2, this.dataGen.generateRandomValue(hoodieKey2, "001"));
        HoodieJavaRDD of = HoodieJavaRDD.of(this.jsc.parallelize(Arrays.asList(hoodieAvroRecord, hoodieAvroRecord2, hoodieAvroRecord3), 1));
        HoodieIndex hoodieIndex = (HoodieIndex) Mockito.mock(HoodieIndex.class);
        Mockito.when(Boolean.valueOf(hoodieIndex.isGlobal())).thenReturn(true);
        List<HoodieRecord<RawTripTestPayload>> collectAsList = HoodieWriteHelper.newInstance().deduplicateRecords(of, hoodieIndex, 1).collectAsList();
        Assertions.assertEquals(1, collectAsList.size());
        Assertions.assertEquals(collectAsList.get(0).getPartitionPath(), hoodieAvroRecord3.getPartitionPath());
        assertNodupesWithinPartition(collectAsList);
        HoodieIndex hoodieIndex2 = (HoodieIndex) Mockito.mock(HoodieIndex.class);
        Mockito.when(Boolean.valueOf(hoodieIndex2.isGlobal())).thenReturn(false);
        List<HoodieRecord<RawTripTestPayload>> collectAsList2 = HoodieWriteHelper.newInstance().deduplicateRecords(of, hoodieIndex2, 1).collectAsList();
        Assertions.assertEquals(2, collectAsList2.size());
        assertNodupesWithinPartition(collectAsList2);
        JavaRDD<HoodieRecord> parallelize = this.jsc.parallelize(Arrays.asList(hoodieAvroRecord, hoodieAvroRecord2, hoodieAvroRecord3), 1);
        HoodieWriteConfig.Builder combineInput = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).combineInput(true, true);
        addConfigsForPopulateMetaFields(combineInput, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(combineInput.build());
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List collect = function3.apply(hoodieWriteClient, parallelize, "001").collect();
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
            Assertions.assertEquals(2, collect.size());
            assertNodupesInPartition((List) collect.stream().map((v0) -> {
                return v0.getWrittenRecords();
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    void assertNodupesInPartition(List<HoodieRecord> list) {
        HashMap hashMap = new HashMap();
        for (HoodieRecord hoodieRecord : list) {
            String recordKey = hoodieRecord.getRecordKey();
            String partitionPath = hoodieRecord.getPartitionPath();
            if (!hashMap.containsKey(partitionPath)) {
                hashMap.put(partitionPath, new HashSet());
            }
            Assertions.assertFalse(((Set) hashMap.get(partitionPath)).contains(recordKey), "key " + recordKey + " is duplicate within partition " + partitionPath);
            ((Set) hashMap.get(partitionPath)).add(recordKey);
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testUpserts(boolean z) throws Exception {
        HoodieWriteConfig.Builder withRollbackUsingMarkers = getConfigBuilder().withRollbackUsingMarkers(true);
        addConfigsForPopulateMetaFields(withRollbackUsingMarkers, z);
        testUpsertsInternal(withRollbackUsingMarkers.build(), (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testUpsertsPrepped(boolean z) throws Exception {
        HoodieWriteConfig.Builder withRollbackUsingMarkers = getConfigBuilder().withRollbackUsingMarkers(true);
        addConfigsForPopulateMetaFields(withRollbackUsingMarkers, z);
        testUpsertsInternal(withRollbackUsingMarkers.build(), (v0, v1, v2) -> {
            return v0.upsertPreppedRecords(v1, v2);
        }, true);
    }

    private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig, HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3, boolean z) throws Exception {
        HoodieWriteConfig build = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true).withProps(hoodieWriteConfig.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).setPopulateMetaFields(hoodieWriteConfig.populateMetaFields()).initTable(this.metaClient.getHadoopConf(), this.metaClient.getBasePath());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        insertFirstBatch(build, hoodieWriteClient, "001", "000", 200, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z, true, 200, hoodieWriteConfig.populateMetaFields());
        updateBatch(build, hoodieWriteClient, "004", "001", Option.of(Arrays.asList("002")), "000", 100, function3, z, true, 100, 200, 2, hoodieWriteConfig.populateMetaFields());
        deleteBatch(build, hoodieWriteClient, "005", "004", "000", 50, (v0, v1, v2) -> {
            return v0.delete(v1, v2);
        }, z, true, 0, 150, hoodieWriteConfig.populateMetaFields());
        HoodieWriteConfig build2 = getConfigBuilder().withProps(hoodieWriteConfig.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).build();
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build2);
        hoodieWriteClient2.savepoint("004", "user1", "comment1");
        hoodieWriteClient2.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled());
        Assertions.assertFalse(this.metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        Assertions.assertEquals(200L, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain 200 records");
        deleteBatch(build2, hoodieWriteClient2, "006", "005", "000", 50, (v0, v1, v2) -> {
            return v0.delete(v1, v2);
        }, z, true, 0, 150);
        List list = (List) new HoodieActiveTimeline(this.metaClient, false).getCommitTimeline().getInstants().collect(Collectors.toList());
        Assertions.assertEquals(5, list.size());
        Assertions.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "001"), list.get(0));
        Assertions.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "004"), list.get(1));
        Assertions.assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, "commit", "006"), list.get(2));
        Assertions.assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "006"), list.get(3));
        Assertions.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "006"), list.get(4));
        HoodieTableMetaClient build3 = HoodieTableMetaClient.builder().setConf(this.jsc.hadoopConfiguration()).setBasePath(this.basePath).build();
        String str = this.basePath;
        HoodieSparkTable hoodieTable = getHoodieTable(build3, build);
        String fileExtension = build3.getTableConfig().getBaseFileFormat().getFileExtension();
        this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) build3.getActiveTimeline().getInstantDetails((HoodieInstant) build3.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(), HoodieCommitMetadata.class);
            String str2 = (String) hoodieCommitMetadata.getPartitionToWriteStats().values().stream().flatMap(list2 -> {
                return list2.stream();
            }).filter(hoodieWriteStat -> {
                return hoodieWriteStat.getPath().endsWith(fileExtension);
            }).findAny().map(hoodieWriteStat2 -> {
                return hoodieWriteStat2.getPath();
            }).orElse(null);
            String str3 = (String) hoodieCommitMetadata.getPartitionToWriteStats().values().stream().flatMap(list3 -> {
                return list3.stream();
            }).filter(hoodieWriteStat3 -> {
                return hoodieWriteStat3.getPath().endsWith(fileExtension);
            }).findAny().map(hoodieWriteStat4 -> {
                return hoodieWriteStat4.getPartitionPath();
            }).orElse(null);
            Path path = new Path(str, str2);
            HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(path.toString());
            try {
                HoodieMergeHandle hoodieMergeHandle = new HoodieMergeHandle(build, "007", hoodieTable, new HashMap(), str3, FSUtils.getFileId(path.getName()), hoodieBaseFile, new SparkTaskContextSupplier(), hoodieWriteConfig.populateMetaFields() ? Option.empty() : Option.of(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieWriteConfig.getProps()))));
                WriteStatus writeStatus = new WriteStatus(false, Double.valueOf(0.0d));
                writeStatus.setStat(new HoodieWriteStat());
                writeStatus.getStat().setNumWrites(0L);
                hoodieMergeHandle.performMergeDataValidationCheck(writeStatus);
            } catch (HoodieCorruptedDataException e) {
                Assertions.fail("Exception not expected because merge validation check is disabled");
            }
            try {
                build.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
                HoodieMergeHandle hoodieMergeHandle2 = new HoodieMergeHandle(HoodieWriteConfig.newBuilder().withProps(build.getProps()).build(), "006", hoodieTable, new HashMap(), str3, FSUtils.getFileId(path.getName()), hoodieBaseFile, new SparkTaskContextSupplier(), hoodieWriteConfig.populateMetaFields() ? Option.empty() : Option.of(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieWriteConfig.getProps()))));
                WriteStatus writeStatus2 = new WriteStatus(false, Double.valueOf(0.0d));
                writeStatus2.setStat(new HoodieWriteStat());
                writeStatus2.getStat().setNumWrites(0L);
                hoodieMergeHandle2.performMergeDataValidationCheck(writeStatus2);
                Assertions.fail("The above line should have thrown an exception");
            } catch (HoodieCorruptedDataException e2) {
            }
            return true;
        }).collect();
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testInsertsWithHoodieConcatHandle(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder();
        addConfigsForPopulateMetaFields(configBuilder, z);
        testHoodieConcatHandle(configBuilder.build(), false);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testInsertsPreppedWithHoodieConcatHandle(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder();
        addConfigsForPopulateMetaFields(configBuilder, z);
        testHoodieConcatHandle(configBuilder.build(), true);
    }

    private void testHoodieConcatHandle(HoodieWriteConfig hoodieWriteConfig, boolean z) throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withProps(hoodieWriteConfig.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).initTable(this.metaClient.getHadoopConf(), this.metaClient.getBasePath());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        insertFirstBatch(build, hoodieWriteClient, "001", "000", 200, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z, true, 200, hoodieWriteConfig.populateMetaFields());
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        writeBatch(hoodieWriteClient, "004", "001", Option.of(Arrays.asList("002")), "000", 100, generateWrapRecordsFn(z, build, hoodieTestDataGenerator::generateUniqueUpdates), (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, true, 100, 300, 2, false, hoodieWriteConfig.populateMetaFields());
    }

    @Test
    public void testInsertsWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        testHoodieConcatHandleOnDupInserts(getConfigBuilder().build(), false);
    }

    @Test
    public void testInsertsPreppedWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        testHoodieConcatHandleOnDupInserts(getConfigBuilder().build(), true);
    }

    private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig hoodieWriteConfig, boolean z) throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withProps(hoodieWriteConfig.getProps()).withMergeAllowDuplicateOnInserts(true).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        insertFirstBatch(build, hoodieWriteClient, "001", "000", 50, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z, true, 50, hoodieWriteConfig.populateMetaFields());
        List asList = Arrays.asList("002", "003");
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        writeBatch(hoodieWriteClient, "004", "001", Option.of(asList), "000", 100, generateWrapRecordsFn(z, build, hoodieTestDataGenerator::generateUpdates), (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, true, 100, 50 + 100, 2, false, hoodieWriteConfig.populateMetaFields());
    }

    @Test
    public void testBulkInsertWithCustomPartitioner() {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withRollbackUsingMarkers(true).build());
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts("001", 100), 10), "001", Option.of(new RDDCustomColumnsSortPartitioner(new String[]{"rider"}, HoodieTestDataGenerator.AVRO_SCHEMA, false))).collect());
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeletes(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        addConfigsForPopulateMetaFields(configBuilder, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(configBuilder.build());
        ArrayList arrayList = new ArrayList();
        writeBatch(hoodieWriteClient, "001", "000", Option.empty(), "000", -1, (str, num) -> {
            List generateInserts = this.dataGen.generateInserts(str, 200);
            List generateDeletes = this.dataGen.generateDeletes(str, 100);
            arrayList.addAll(generateInserts);
            arrayList.addAll(generateDeletes);
            return arrayList;
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, 200, 200, 1, false, z);
        ArrayList arrayList2 = new ArrayList();
        writeBatch(hoodieWriteClient, "004", "001", Option.empty(), "000", 100, (str2, num2) -> {
            List subList = arrayList.subList(0, 50);
            List subList2 = arrayList.subList(50, 100);
            arrayList2.addAll(this.dataGen.generateDeletesFromExistingRecords(subList));
            arrayList2.addAll(subList2);
            return arrayList2;
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, 50, 150, 2, false, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeletesForInsertsInSameBatch(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        addConfigsForPopulateMetaFields(configBuilder, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(configBuilder.build());
        ArrayList arrayList = new ArrayList();
        writeBatch(hoodieWriteClient, "001", "000", Option.empty(), "000", -1, (str, num) -> {
            List generateInserts = this.dataGen.generateInserts(str, 200);
            List subList = generateInserts.subList(40, 90);
            arrayList.addAll(generateInserts);
            arrayList.addAll(this.dataGen.generateDeletesFromExistingRecords(subList));
            return arrayList;
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, 150, 150, 1, false, z);
    }

    @EnumSource(value = HoodieIndex.IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
    @ParameterizedTest
    public void testUpsertsUpdatePartitionPathGlobalBloom(HoodieIndex.IndexType indexType) throws Exception {
        testUpsertsUpdatePartitionPath(indexType, getConfig(), (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        });
    }

    private void testUpsertsUpdatePartitionPath(HoodieIndex.IndexType indexType, HoodieWriteConfig hoodieWriteConfig, HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3) throws Exception {
        String str;
        HoodieWriteConfig build = getConfigBuilder().withProps(hoodieWriteConfig.getProps()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000L).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).withBloomIndexUpdatePartitionPath(true).withGlobalSimpleIndexUpdatePartitionPath(true).build()).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).initTable(this.metaClient.getHadoopConf(), this.metaClient.getBasePath());
        build.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        hoodieWriteClient.startCommitWithTime("001");
        List<HoodieRecord> generateInserts = this.dataGen.generateInserts("001", 10);
        HashSet hashSet = new HashSet();
        for (HoodieRecord hoodieRecord : generateInserts) {
            hashSet.add(Pair.of(hoodieRecord.getPartitionPath(), hoodieRecord.getRecordKey()));
        }
        function3.apply(hoodieWriteClient, this.jsc.parallelize(generateInserts, 1), "001").collect();
        assertPartitionPathRecordKeys(hashSet, getFullPartitionPaths());
        Map baseFileCountsForPaths = FileCreateUtils.getBaseFileCountsForPaths(this.basePath, this.fs, getFullPartitionPaths((String[]) hashSet.stream().map((v0) -> {
            return v0.getLeft();
        }).toArray(i -> {
            return new String[i];
        })));
        Iterator it = baseFileCountsForPaths.entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(1L, (Long) ((Map.Entry) it.next()).getValue());
        }
        Assertions.assertTrue(baseFileCountsForPaths.entrySet().stream().allMatch(entry -> {
            return ((Long) entry.getValue()).longValue() == 1;
        }));
        hoodieWriteClient.startCommitWithTime("002");
        List<HoodieRecord> generateInserts2 = this.dataGen.generateInserts("002", 20);
        for (HoodieRecord hoodieRecord2 : generateInserts2) {
            hashSet.add(Pair.of(hoodieRecord2.getPartitionPath(), hoodieRecord2.getRecordKey()));
        }
        function3.apply(hoodieWriteClient, this.jsc.parallelize(generateInserts2, 1), "002").collect();
        String[] fullPartitionPaths = getFullPartitionPaths();
        assertPartitionPathRecordKeys(hashSet, fullPartitionPaths);
        Assertions.assertTrue(FileCreateUtils.getBaseFileCountsForPaths(this.basePath, this.fs, fullPartitionPaths).entrySet().stream().filter(entry2 -> {
            return ((Long) entry2.getValue()).longValue() > 1;
        }).count() >= 1, "At least one partition should have more than 1 base file after 2nd batch of writes");
        ArrayList arrayList = new ArrayList();
        for (HoodieRecord hoodieRecord3 : generateInserts) {
            hashSet.remove(Pair.of(hoodieRecord3.getPartitionPath(), hoodieRecord3.getRecordKey()));
            String partitionPath = hoodieRecord3.getPartitionPath();
            if (partitionPath.equalsIgnoreCase("2016/03/15")) {
                str = "2015/03/16";
            } else if (partitionPath.equalsIgnoreCase("2015/03/16")) {
                str = "2015/03/17";
            } else {
                if (!partitionPath.equalsIgnoreCase("2015/03/17")) {
                    throw new IllegalStateException("Unknown partition path " + hoodieRecord3.getPartitionPath());
                }
                str = "2016/03/15";
            }
            String str2 = str;
            arrayList.add(new HoodieAvroRecord(new HoodieKey(hoodieRecord3.getRecordKey(), str2), (HoodieRecordPayload) hoodieRecord3.getData()));
            hashSet.add(Pair.of(str2, hoodieRecord3.getRecordKey()));
        }
        function3.apply(hoodieWriteClient, this.jsc.parallelize(arrayList, 1), "003").collect();
        assertPartitionPathRecordKeys(hashSet, getFullPartitionPaths());
    }

    private void assertPartitionPathRecordKeys(Set<Pair<String, String>> set, String[] strArr) {
        assertActualAndExpectedPartitionPathRecordKeyMatches(set, getActualPartitionPathAndRecordKeys(getAllRows(strArr)));
    }

    private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<Row> dataset) {
        ArrayList arrayList = new ArrayList();
        for (Row row : dataset.collectAsList()) {
            arrayList.add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
        }
        return arrayList;
    }

    private Dataset<Row> getAllRows(String[] strArr) {
        return HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr);
    }

    private String[] getFullPartitionPaths() {
        return getFullPartitionPaths(this.dataGen.getPartitionPaths());
    }

    private String[] getFullPartitionPaths(String[] strArr) {
        String[] strArr2 = new String[strArr.length];
        for (int i = 0; i < strArr2.length; i++) {
            strArr2[i] = String.format("%s/%s/*", this.basePath, strArr[i]);
        }
        return strArr2;
    }

    private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> set, List<Pair<String, String>> list) {
        Assertions.assertEquals(set.size(), list.size());
        Iterator<Pair<String, String>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(set.contains(it.next()));
        }
        Iterator<Pair<String, String>> it2 = set.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(list.contains(it2.next()));
        }
    }

    private Pair<List<WriteStatus>, List<HoodieRecord>> insertBatchRecords(SparkRDDWriteClient sparkRDDWriteClient, String str, Integer num, int i) {
        sparkRDDWriteClient.startCommitWithTime(str);
        List generateInserts = this.dataGen.generateInserts(str, num);
        List collect = sparkRDDWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Assertions.assertEquals(i, collect.size(), "check expect statue size.");
        return Pair.of(collect, generateInserts);
    }

    @Test
    public void testUpdateRejectForClustering() throws IOException {
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        Properties properties = new Properties();
        properties.setProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), "true");
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), true, properties);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(smallInsertWriteConfig, this.context, this.metaClient);
        List list = (List) insertBatchRecords(hoodieWriteClient, "001", 600, 2).getValue();
        List list2 = (List) create.getFileSystemView().getAllFileGroups("2016/09/26").map(hoodieFileGroup -> {
            return hoodieFileGroup.getFileGroupId().getFileId();
        }).collect(Collectors.toList());
        Assertions.assertEquals(2, list2.size());
        List list3 = (List) create.getFileSystemView().getAllFileGroups("2016/09/26").map(hoodieFileGroup2 -> {
            return (List) hoodieFileGroup2.getAllFileSlices().collect(Collectors.toList());
        }).collect(Collectors.toList());
        createRequestedReplaceInstant(this.metaClient, "002", (List[]) list3.toArray(new List[list3.size()]));
        String str = "003";
        insertBatchRecords(hoodieWriteClient, "003", 1, 1).getKey();
        List list4 = (List) create.getFileSystemView().getAllFileGroups("2016/09/26").map(hoodieFileGroup3 -> {
            return hoodieFileGroup3.getFileGroupId().getFileId();
        }).collect(Collectors.toList());
        Assertions.assertEquals(3, list4.size());
        hoodieWriteClient.startCommitWithTime("004");
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.dataGen.generateUpdates("004", list));
        Assertions.assertThrows(HoodieUpsertException.class, () -> {
            this.writeClient.upsert(this.jsc.parallelize(arrayList, 1), str).collect();
        }, String.format("Not allowed to update the clustering files in partition: %s For pending clustering operations, we are not going to support update for now.", "2016/09/26"));
        List list5 = (List) insertBatchRecords(hoodieWriteClient, "005", 1, 1).getKey();
        list4.removeAll(list2);
        Assertions.assertEquals(list4.get(0), ((WriteStatus) list5.get(0)).getFileId());
        Assertions.assertEquals(3, ((List) create.getFileSystemView().getAllFileGroups("2016/09/26").map(hoodieFileGroup4 -> {
            return hoodieFileGroup4.getFileGroupId().getFileId();
        }).collect(Collectors.toList())).size());
    }

    @Test
    public void testSmallInsertHandlingForUpserts() throws Exception {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(this.metaClient);
        hoodieWriteClient.startCommitWithTime("001");
        List generateInserts = this.dataGen.generateInserts("001", 100);
        Set recordsToRecordKeySet = Transformations.recordsToRecordKeySet(generateInserts);
        List collect = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Assertions.assertEquals(1, collect.size(), "Just 1 file needs to be added.");
        String fileId = ((WriteStatus) collect.get(0)).getFileId();
        Assertions.assertEquals(100, baseFileUtils.readRowKeys(this.hadoopConf, new Path(this.basePath, ((WriteStatus) collect.get(0)).getStat().getPath())).size(), "file should contain 100 records");
        hoodieWriteClient.startCommitWithTime("002");
        List generateInserts2 = this.dataGen.generateInserts("002", 40);
        Set recordsToRecordKeySet2 = Transformations.recordsToRecordKeySet(generateInserts2);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(generateInserts2);
        arrayList.addAll(this.dataGen.generateUpdates("002", generateInserts));
        List collect2 = hoodieWriteClient.upsert(this.jsc.parallelize(arrayList, 1), "002").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect2);
        Assertions.assertEquals(1, collect2.size(), "Just 1 file needs to be updated.");
        Assertions.assertEquals(fileId, ((WriteStatus) collect2.get(0)).getFileId(), "Existing file should be expanded");
        Assertions.assertEquals("001", ((WriteStatus) collect2.get(0)).getStat().getPrevCommit(), "Existing file should be expanded");
        Path path = new Path(this.basePath, ((WriteStatus) collect2.get(0)).getStat().getPath());
        Assertions.assertEquals(140, baseFileUtils.readRowKeys(this.hadoopConf, path).size(), "file should contain 140 records");
        for (GenericRecord genericRecord : baseFileUtils.readAvroRecords(this.hadoopConf, path)) {
            String obj = genericRecord.get("_hoodie_record_key").toString();
            Assertions.assertEquals("002", genericRecord.get("_hoodie_commit_time").toString(), "only expect commit2");
            Assertions.assertTrue(recordsToRecordKeySet2.contains(obj) || recordsToRecordKeySet.contains(obj), "key expected to be part of commit2");
        }
        hoodieWriteClient.startCommitWithTime("003");
        List generateInserts3 = this.dataGen.generateInserts("003", 200);
        Set recordsToRecordKeySet3 = Transformations.recordsToRecordKeySet(generateInserts3);
        generateInserts3.addAll(this.dataGen.generateUpdates("003", generateInserts2));
        List collect3 = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts3, 1), "003").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect3);
        Assertions.assertEquals(2, collect3.size(), "2 files needs to be committed.");
        int i = 0;
        int i2 = 0;
        for (HoodieBaseFile hoodieBaseFile : (List) getHoodieTable(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build(), smallInsertWriteConfig).getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2016/09/26", "003").collect(Collectors.toList())) {
            if (hoodieBaseFile.getFileName().contains(fileId)) {
                Assertions.assertEquals("003", hoodieBaseFile.getCommitTime(), "Existing file should be expanded");
                for (GenericRecord genericRecord2 : baseFileUtils.readAvroRecords(this.hadoopConf, new Path(hoodieBaseFile.getPath()))) {
                    String obj2 = genericRecord2.get("_hoodie_record_key").toString();
                    if (genericRecord2.get("_hoodie_commit_time").toString().equals("003")) {
                        if (recordsToRecordKeySet2.contains(obj2)) {
                            recordsToRecordKeySet2.remove(obj2);
                            i2++;
                        } else {
                            i++;
                        }
                    }
                }
                Assertions.assertEquals(0, recordsToRecordKeySet2.size(), "All keys added in commit 2 must be updated in commit3 correctly");
            } else {
                Assertions.assertEquals("003", hoodieBaseFile.getCommitTime(), "New file must be written for commit 3");
                List<GenericRecord> readAvroRecords = baseFileUtils.readAvroRecords(this.hadoopConf, new Path(hoodieBaseFile.getPath()));
                for (GenericRecord genericRecord3 : readAvroRecords) {
                    String obj3 = genericRecord3.get("_hoodie_record_key").toString();
                    Assertions.assertEquals("003", genericRecord3.get("_hoodie_commit_time").toString(), "only expect commit3");
                    Assertions.assertTrue(recordsToRecordKeySet3.contains(obj3), "key expected to be part of commit3");
                }
                i += readAvroRecords.size();
            }
        }
        Assertions.assertEquals(i2, generateInserts2.size(), "Total updates in commit3 must add up");
        Assertions.assertEquals(i, recordsToRecordKeySet3.size(), "Total inserts in commit3 must add up");
    }

    @MethodSource({"smallInsertHandlingParams"})
    @ParameterizedTest
    public void testSmallInsertHandlingForInserts(boolean z) throws Exception {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, false, z);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(this.metaClient);
        hoodieWriteClient.startCommitWithTime("001");
        List generateInserts = this.dataGen.generateInserts("001", 100);
        Set recordsToRecordKeySet = Transformations.recordsToRecordKeySet(generateInserts);
        List collect = hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "001").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        assertPartitionMetadata(new String[]{"2016/09/26"}, this.fs);
        Assertions.assertEquals(1, collect.size(), "Just 1 file needs to be added.");
        String fileId = ((WriteStatus) collect.get(0)).getFileId();
        Assertions.assertEquals(100, baseFileUtils.readRowKeys(this.hadoopConf, new Path(this.basePath, ((WriteStatus) collect.get(0)).getStat().getPath())).size(), "file should contain 100 records");
        hoodieWriteClient.startCommitWithTime("002");
        List generateInserts2 = this.dataGen.generateInserts("002", 40);
        Set recordsToRecordKeySet2 = Transformations.recordsToRecordKeySet(generateInserts2);
        List collect2 = hoodieWriteClient.insert(this.jsc.parallelize(generateInserts2, 1), "002").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect2);
        Assertions.assertEquals(1, collect2.size(), "Just 1 file needs to be updated.");
        Assertions.assertEquals(fileId, ((WriteStatus) collect2.get(0)).getFileId(), "Existing file should be expanded");
        Assertions.assertEquals("001", ((WriteStatus) collect2.get(0)).getStat().getPrevCommit(), "Existing file should be expanded");
        Path path = new Path(this.basePath, ((WriteStatus) collect2.get(0)).getStat().getPath());
        Assertions.assertEquals(140, baseFileUtils.readRowKeys(this.hadoopConf, path).size(), "file should contain 140 records");
        for (GenericRecord genericRecord : baseFileUtils.readAvroRecords(this.hadoopConf, path)) {
            String obj = genericRecord.get("_hoodie_record_key").toString();
            String obj2 = genericRecord.get("_hoodie_commit_time").toString();
            Assertions.assertTrue("001".equals(obj2) || "002".equals(obj2), "Record expected to be part of commit 1 or commit2");
            Assertions.assertTrue(recordsToRecordKeySet2.contains(obj) || recordsToRecordKeySet.contains(obj), "key expected to be part of commit 1 or commit2");
        }
        hoodieWriteClient.startCommitWithTime("003");
        List generateInserts3 = this.dataGen.generateInserts("003", 200);
        List collect3 = hoodieWriteClient.insert(this.jsc.parallelize(generateInserts3, 1), "003").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect3);
        Assertions.assertEquals(2, collect3.size(), "2 files needs to be committed.");
        Assertions.assertEquals(340, baseFileUtils.readRowKeys(this.hadoopConf, new Path(this.basePath, ((WriteStatus) collect3.get(0)).getStat().getPath())).size() + baseFileUtils.readRowKeys(this.hadoopConf, new Path(this.basePath, ((WriteStatus) collect3.get(1)).getStat().getPath())).size(), "file should contain 340 records");
        List<HoodieBaseFile> list = (List) getHoodieTable(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build(), smallInsertWriteConfig).getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2016/09/26", "003").collect(Collectors.toList());
        Assertions.assertEquals(2, list.size(), "Total of 2 valid data files");
        int i = 0;
        for (HoodieBaseFile hoodieBaseFile : list) {
            Assertions.assertEquals("003", hoodieBaseFile.getCommitTime(), "All files must be at commit 3");
            i += baseFileUtils.readAvroRecords(this.hadoopConf, new Path(hoodieBaseFile.getPath())).size();
        }
        Assertions.assertEquals(i, generateInserts.size() + generateInserts2.size() + generateInserts3.size(), "Total number of records must add up");
    }

    @Test
    public void testDeletesWithDeleteApi() throws Exception {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        hoodieWriteClient.startCommitWithTime("001");
        List<HoodieRecord> generateInserts = this.dataGen.generateInserts("001", 100);
        ArrayList arrayList = new ArrayList(Transformations.recordsToRecordKeySet(generateInserts));
        List collect = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Assertions.assertEquals(1, collect.size(), "Just 1 file needs to be added.");
        String fileId = ((WriteStatus) collect.get(0)).getFileId();
        Assertions.assertEquals(100, BaseFileUtils.getInstance(this.metaClient).readRowKeys(this.hadoopConf, new Path(this.basePath, ((WriteStatus) collect.get(0)).getStat().getPath())).size(), "file should contain 100 records");
        testDeletes(hoodieWriteClient, generateInserts, 20, fileId, "002", 80, arrayList);
        Pair<Set<String>, List<HoodieRecord>> testUpdates = testUpdates("003", hoodieWriteClient, 40, 120);
        arrayList.addAll((Collection) testUpdates.getLeft());
        testDeletes(hoodieWriteClient, (List) testUpdates.getRight(), 10, fileId, "004", 110, arrayList);
        Pair<Set<String>, List<HoodieRecord>> testUpdates2 = testUpdates("005", hoodieWriteClient, 40, 150);
        arrayList.addAll((Collection) testUpdates2.getLeft());
        hoodieWriteClient.startCommitWithTime("006");
        List collect2 = hoodieWriteClient.delete(this.jsc.parallelize(Transformations.randomSelectAsHoodieKeys(this.dataGen.generateInserts("006", 20), 20), 1), "006").collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect2);
        Assertions.assertEquals(0, collect2.size(), "Just 0 write status for delete.");
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        Assertions.assertEquals(150L, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain 150 records");
        testDeletes(hoodieWriteClient, (List) testUpdates2.getRight(), 10, fileId, "007", 140, arrayList);
    }

    @MethodSource({"populateMetaFieldsAndPreserveMetadataParams"})
    @ParameterizedTest
    public void testSimpleClustering(boolean z, boolean z2) throws Exception {
        testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).withPreserveHoodieCommitMetadata(Boolean.valueOf(z2)).build(), z, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
        testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).withPreserveHoodieCommitMetadata(true).build(), true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withAutoCommit(false).build());
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 200);
        hoodieWriteClient.startCommitWithTime(createNewInstantTime);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 2), createNewInstantTime).collect());
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        Assertions.assertEquals(2, build.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
        List generateInserts2 = this.dataGen.generateInserts(createNewInstantTime, 200);
        hoodieWriteClient.startCommitWithTime(createNewInstantTime);
        JavaRDD upsert = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts2, 2), createNewInstantTime);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
        hoodieWriteClient.commit(createNewInstantTime, upsert);
        build.reloadActiveTimeline();
        Assertions.assertEquals(1, build.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInlineScheduleClustering(boolean z) throws IOException {
        testInsertTwoBatches(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(Boolean.valueOf(z)).withPreserveHoodieCommitMetadata(true).build()).withProps(getPropertiesForKeyGen()).build());
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2015/03/16"});
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 200);
        hoodieWriteClient.startCommitWithTime(createNewInstantTime);
        JavaRDD upsert = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 2), createNewInstantTime);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
        hoodieWriteClient.commit(createNewInstantTime, upsert);
        List list = (List) ClusteringUtils.getAllPendingClusteringPlans(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build()).collect(Collectors.toList());
        if (z) {
            Assertions.assertEquals(1, list.size());
        } else {
            Assertions.assertEquals(0, list.size());
        }
    }

    @MethodSource({"populateMetaFieldsAndPreserveMetadataParams"})
    @ParameterizedTest
    public void testClusteringWithSortColumns(boolean z, boolean z2) throws Exception {
        testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns(z ? "_hoodie_record_key" : "_row_key").withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).withPreserveHoodieCommitMetadata(Boolean.valueOf(z2)).build(), z, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @MethodSource({"populateMetaFieldsAndPreserveMetadataParams"})
    @ParameterizedTest
    public void testClusteringWithSortOneFilePerGroup(boolean z, boolean z2) throws Exception {
        testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns("begin_lat,begin_lon").withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName()).withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName()).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(Boolean.valueOf(z2)).build(), z, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testPendingClusteringRollback() throws Exception {
        boolean z = true;
        List<HoodieRecord> testInsertAndClustering = testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(), true, false);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        List list = (List) ClusteringUtils.getAllPendingClusteringPlans(build).collect(Collectors.toList());
        Assertions.assertEquals(1, list.size());
        HoodieInstant hoodieInstant = (HoodieInstant) ((Pair) list.get(0)).getLeft();
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        addConfigsForPopulateMetaFields(configBuilder, true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(configBuilder.build());
        this.dataGen = new HoodieTestDataGenerator();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        testInsertAndClustering.addAll(this.dataGen.generateInserts(createNewInstantTime, 200));
        Assertions.assertThrows(HoodieUpsertException.class, () -> {
            writeAndVerifyBatch(hoodieWriteClient, testInsertAndClustering, createNewInstantTime, z);
        });
        hoodieWriteClient.rollback(hoodieInstant.getTimestamp());
        build.reloadActiveTimeline();
        Assertions.assertEquals(0L, ClusteringUtils.getAllPendingClusteringPlans(build).count());
        HoodieInstant hoodieInstant2 = (HoodieInstant) build.getActiveTimeline().getRollbackTimeline().lastInstant().get();
        FileCreateUtils.deleteRollbackCommit(build.getBasePath(), hoodieInstant2.getTimestamp());
        build.reloadActiveTimeline();
        FileCreateUtils.createRequestedReplaceCommit(build.getBasePath(), hoodieInstant.getTimestamp(), Option.of(HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(ClusteringTestUtils.createClusteringPlan(build, hoodieInstant.getTimestamp(), "1")).setOperationType(WriteOperationType.CLUSTER.name()).build()));
        try {
            hoodieWriteClient.cluster(hoodieInstant.getTimestamp(), false);
        } catch (Exception e) {
        }
        build.reloadActiveTimeline();
        Assertions.assertEquals(hoodieInstant2.getTimestamp(), ((HoodieInstant) build.getActiveTimeline().getRollbackTimeline().lastInstant().get()).getTimestamp());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean z) throws Exception {
        HoodieClusteringConfig build = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").withRollbackPendingClustering(Boolean.valueOf(z)).withInlineClustering(true).withInlineClusteringNumCommits(1).build();
        List<HoodieRecord> testInsertAndClustering = testInsertAndClustering(build, true, false);
        HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        List list = (List) ClusteringUtils.getAllPendingClusteringPlans(build2).collect(Collectors.toList());
        Assertions.assertEquals(1, list.size());
        Assertions.assertEquals(((HoodieInstant) ((Pair) list.get(0)).getLeft()).getState(), HoodieInstant.State.INFLIGHT);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        addConfigsForPopulateMetaFields(configBuilder, true);
        configBuilder.withClusteringConfig(build);
        HoodieWriteConfig build3 = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build3);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        testInsertAndClustering.addAll(this.dataGen.generateUpdates(createNewInstantTime, 200));
        writeAndVerifyBatch(hoodieWriteClient, testInsertAndClustering, createNewInstantTime, true);
        build2.reloadActiveTimeline();
        Assertions.assertEquals(build3.isRollbackPendingClustering() ? 0 : 1, ((List) ClusteringUtils.getAllPendingClusteringPlans(build2).collect(Collectors.toList())).size());
    }

    @Test
    public void testClusteringWithFailingValidator() throws Exception {
        try {
            testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(), true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
            Assertions.fail("expected pre-commit clustering validation to fail");
        } catch (HoodieValidationException e) {
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
        try {
            testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(), false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
            Assertions.fail("expected pre-commit clustering validation to fail because sql query is not configured");
        } catch (HoodieValidationException e) {
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
        testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(), false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#400");
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
        try {
            testInsertAndClustering(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(), false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#802");
            Assertions.fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
        } catch (HoodieValidationException e) {
        }
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2) throws Exception {
        return testInsertAndClustering(hoodieClusteringConfig, z, z2, false, "", "", "");
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2, boolean z3, String str, String str2, String str3) throws Exception {
        Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches = testInsertTwoBatches(z);
        testClustering(hoodieClusteringConfig, z, z2, z3, str, str2, str3, testInsertTwoBatches);
        return (List) ((Pair) testInsertTwoBatches.getLeft()).getLeft();
    }

    private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean z) throws IOException {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", 10L, false, z, z ? new Properties() : getPropertiesForKeyGen()));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2015/03/16"});
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        List<HoodieRecord> generateInserts = this.dataGen.generateInserts(createNewInstantTime, 200);
        Set<HoodieFileGroupId> fileGroupIdsFromWriteStatus = getFileGroupIdsFromWriteStatus(writeAndVerifyBatch(hoodieWriteClient, generateInserts, createNewInstantTime, z));
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        List<HoodieRecord> generateInserts2 = this.dataGen.generateInserts(createNewInstantTime2, 200);
        Set<HoodieFileGroupId> fileGroupIdsFromWriteStatus2 = getFileGroupIdsFromWriteStatus(writeAndVerifyBatch(hoodieWriteClient, generateInserts2, createNewInstantTime2, z));
        HashSet hashSet = new HashSet(fileGroupIdsFromWriteStatus);
        hashSet.addAll(fileGroupIdsFromWriteStatus2);
        HashSet hashSet2 = new HashSet(fileGroupIdsFromWriteStatus);
        hashSet2.retainAll(fileGroupIdsFromWriteStatus2);
        Assertions.assertEquals(0, hashSet2.size());
        return Pair.of(Pair.of(Stream.concat(generateInserts.stream(), generateInserts2.stream()).collect(Collectors.toList()), Arrays.asList(createNewInstantTime, createNewInstantTime2)), hashSet);
    }

    private void testClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2, boolean z3, String str, String str2, String str3, Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> pair) throws IOException {
        HoodieWriteConfig build = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(hoodieClusteringConfig).withProps(getPropertiesForKeyGen()).build();
        HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering = performClustering(hoodieClusteringConfig, z, z2, str, str2, str3, (Pair) pair.getLeft());
        if (z3) {
            Assertions.assertEquals((Set) pair.getRight(), (Set) ((List) performClustering.getWriteStats().get()).stream().map(hoodieWriteStat -> {
                return new HoodieFileGroupId(hoodieWriteStat.getPartitionPath(), hoodieWriteStat.getFileId());
            }).collect(Collectors.toSet()));
        }
        if (z2) {
            verifyRecordsWritten(((HoodieInstant) this.metaClient.reloadActiveTimeline().getCompletedReplaceTimeline().getReverseOrderedInstants().findFirst().get()).getTimestamp(), z, (List) ((Pair) pair.getLeft()).getLeft(), ((JavaRDD) performClustering.getWriteStatuses()).collect(), build);
        }
    }

    private HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2, String str, String str2, String str3, Pair<List<HoodieRecord>, List<String>> pair) throws IOException {
        HoodieWriteConfig build = getConfigBuilder().withAutoCommit(false).withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(StringUtils.nullToEmpty(str)).withPrecommitValidatorEqualitySqlQueries(str2).withPrecommitValidatorSingleResultSqlQueries(str3).build()).withProps(z ? new Properties() : getPropertiesForKeyGen()).withClusteringConfig(hoodieClusteringConfig).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String obj = hoodieWriteClient.scheduleClustering(Option.empty()).get().toString();
        HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster = hoodieWriteClient.cluster(obj, z2);
        if (build.isPreserveHoodieCommitMetadataForClustering() && build.populateMetaFields()) {
            verifyRecordsWrittenWithPreservedMetadata(new HashSet((Collection) pair.getRight()), (List) pair.getLeft(), ((JavaRDD) cluster.getWriteStatuses()).collect());
        } else {
            verifyRecordsWritten(obj, z, (List) pair.getLeft(), ((JavaRDD) cluster.getWriteStatuses()).collect(), build);
        }
        HashSet hashSet = new HashSet();
        cluster.getPartitionToReplaceFileIds().entrySet().forEach(entry -> {
            ((List) entry.getValue()).stream().forEach(str4 -> {
                hashSet.add(new HoodieFileGroupId((String) entry.getKey(), str4));
            });
        });
        return cluster;
    }

    private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> list) {
        return (Set) list.stream().map(writeStatus -> {
            return new HoodieFileGroupId(writeStatus.getPartitionPath(), writeStatus.getFileId());
        }).collect(Collectors.toSet());
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testInsertOverwritePartitionHandlingWithMoreRecords(boolean z) throws Exception {
        verifyInsertOverwritePartitionHandling(1000, 3000, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testInsertOverwritePartitionHandlingWithFewerRecords(boolean z) throws Exception {
        verifyInsertOverwritePartitionHandling(3000, 1000, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords(boolean z) throws Exception {
        verifyInsertOverwritePartitionHandling(3000, 3000, z);
    }

    private void verifyInsertOverwritePartitionHandling(int i, int i2, boolean z) throws Exception {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), z, z ? new Properties() : getPropertiesForKeyGen());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"americas"});
        Set<String> fileIdsFromWriteStatus = getFileIdsFromWriteStatus(writeAndVerifyBatch(hoodieWriteClient, this.dataGen.generateInserts("001", Integer.valueOf(i)), "001", z));
        hoodieWriteClient.startCommitWithTime("002", "replacecommit");
        List<HoodieRecord> generateInserts = this.dataGen.generateInserts("002", Integer.valueOf(i2));
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(generateInserts);
        HoodieWriteResult insertOverwrite = hoodieWriteClient.insertOverwrite(this.jsc.parallelize(arrayList, 2), "002");
        List<WriteStatus> collect = insertOverwrite.getWriteStatuses().collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Assertions.assertEquals(fileIdsFromWriteStatus, new HashSet((Collection) insertOverwrite.getPartitionToReplaceFileIds().get("americas")));
        verifyRecordsWritten("002", z, generateInserts, collect, smallInsertWriteConfig);
    }

    private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> list) {
        return (Set) list.stream().map(writeStatus -> {
            return writeStatus.getFileId();
        }).collect(Collectors.toSet());
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition(boolean z) throws Exception {
        verifyDeletePartitionsHandling(1000, 3000, 3000, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords(boolean z) throws Exception {
        verifyDeletePartitionsHandling(3000, 3000, 3000, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition(boolean z) throws Exception {
        verifyDeletePartitionsHandling(3000, 1000, 1000, z);
    }

    private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient sparkRDDWriteClient, int i, String str, String str2) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateInsertsForPartition = this.dataGen.generateInsertsForPartition(str, Integer.valueOf(i), str2);
        List<WriteStatus> collect = sparkRDDWriteClient.upsert(this.jsc.parallelize(generateInsertsForPartition, 2), str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Set<String> set = (Set) collect.stream().map(writeStatus -> {
            return writeStatus.getFileId();
        }).collect(Collectors.toSet());
        verifyRecordsWritten(str, true, generateInsertsForPartition, collect, sparkRDDWriteClient.getConfig());
        return set;
    }

    private Set<String> deletePartitionWithCommit(SparkRDDWriteClient sparkRDDWriteClient, String str, List<String> list) {
        sparkRDDWriteClient.startCommitWithTime(str, "replacecommit");
        return (Set) sparkRDDWriteClient.deletePartitions(list, str).getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> {
            return ((List) entry.getValue()).stream();
        }).collect(Collectors.toSet());
    }

    private void verifyDeletePartitionsHandling(int i, int i2, int i3, boolean z) throws Exception {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), z, z ? new Properties() : getPropertiesForKeyGen()));
        this.dataGen = new HoodieTestDataGenerator();
        Set<String> insertPartitionRecordsWithCommit = insertPartitionRecordsWithCommit(hoodieWriteClient, i, "001", "2016/03/15");
        Set<String> insertPartitionRecordsWithCommit2 = insertPartitionRecordsWithCommit(hoodieWriteClient, i2, "002", "2015/03/16");
        Set<String> insertPartitionRecordsWithCommit3 = insertPartitionRecordsWithCommit(hoodieWriteClient, i3, "003", "2015/03/17");
        Assertions.assertEquals(insertPartitionRecordsWithCommit, deletePartitionWithCommit(hoodieWriteClient, "004", Arrays.asList("2016/03/15")));
        Assertions.assertEquals(0, HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.fs, String.format("%s/%s/*", this.basePath, "2016/03/15")).size());
        Assertions.assertTrue(HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.fs, String.format("%s/%s/*", this.basePath, "2015/03/16")).size() > 0);
        Assertions.assertTrue(HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.fs, String.format("%s/%s/*", this.basePath, "2015/03/17")).size() > 0);
        Set<String> deletePartitionWithCommit = deletePartitionWithCommit(hoodieWriteClient, "005", Arrays.asList("2015/03/16", "2015/03/17"));
        HashSet hashSet = new HashSet();
        hashSet.addAll(insertPartitionRecordsWithCommit2);
        hashSet.addAll(insertPartitionRecordsWithCommit3);
        Assertions.assertEquals(hashSet, deletePartitionWithCommit);
        Assertions.assertEquals(0, HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.fs, String.format("%s/%s/*", this.basePath, "2016/03/15"), String.format("%s/%s/*", this.basePath, "2015/03/16"), String.format("%s/%s/*", this.basePath, "2015/03/17")).size());
    }

    private void verifyRecordsWritten(String str, boolean z, List<HoodieRecord> list, List<WriteStatus> list2, HoodieWriteConfig hoodieWriteConfig) throws IOException {
        ArrayList arrayList = new ArrayList();
        Set<String> verifyRecordKeys = verifyRecordKeys(list, list2, arrayList);
        if (hoodieWriteConfig.populateMetaFields()) {
            for (GenericRecord genericRecord : arrayList) {
                String obj = genericRecord.get("_hoodie_record_key").toString();
                Assertions.assertEquals(str, genericRecord.get("_hoodie_commit_time").toString());
                Assertions.assertTrue(verifyRecordKeys.contains(obj));
            }
            return;
        }
        KeyGenerator createKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieWriteConfig.getProps()));
        for (GenericRecord genericRecord2 : arrayList) {
            String recordKey = createKeyGenerator.getKey(genericRecord2).getRecordKey();
            if (!z) {
                Assertions.assertNull(genericRecord2.get("_hoodie_commit_time"));
            }
            Assertions.assertTrue(verifyRecordKeys.contains(recordKey));
        }
    }

    @NotNull
    private Set<String> verifyRecordKeys(List<HoodieRecord> list, List<WriteStatus> list2, List<GenericRecord> list3) {
        Iterator<WriteStatus> it = list2.iterator();
        while (it.hasNext()) {
            list3.addAll(BaseFileUtils.getInstance(this.metaClient).readAvroRecords(this.jsc.hadoopConfiguration(), new Path(this.basePath, it.next().getStat().getPath())));
        }
        Set<String> recordsToRecordKeySet = Transformations.recordsToRecordKeySet(list);
        Assertions.assertEquals(list3.size(), recordsToRecordKeySet.size());
        return recordsToRecordKeySet;
    }

    private void verifyRecordsWrittenWithPreservedMetadata(Set<String> set, List<HoodieRecord> list, List<WriteStatus> list2) {
        ArrayList arrayList = new ArrayList();
        Set<String> verifyRecordKeys = verifyRecordKeys(list, list2, arrayList);
        Assertions.assertTrue(set.containsAll(((Map) arrayList.stream().collect(Collectors.groupingBy(genericRecord -> {
            return genericRecord.get("_hoodie_commit_time").toString();
        }))).keySet()));
        Set set2 = (Set) list2.stream().map((v0) -> {
            return v0.getFileId();
        }).collect(Collectors.toSet());
        for (GenericRecord genericRecord2 : arrayList) {
            Assertions.assertTrue(verifyRecordKeys.contains(genericRecord2.get("_hoodie_record_key").toString()));
            Assertions.assertTrue(set2.contains(FSUtils.getFileId(genericRecord2.get("_hoodie_file_name").toString())));
        }
    }

    private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient sparkRDDWriteClient, List<HoodieRecord> list, String str, boolean z) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<WriteStatus> collect = sparkRDDWriteClient.upsert(this.jsc.parallelize(list, 2), str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        verifyRecordsWritten(str, z, list, collect, sparkRDDWriteClient.getConfig());
        return collect;
    }

    private Pair<Set<String>, List<HoodieRecord>> testUpdates(String str, SparkRDDWriteClient sparkRDDWriteClient, int i, int i2) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List generateInserts = this.dataGen.generateInserts(str, Integer.valueOf(i));
        Set recordsToRecordKeySet = Transformations.recordsToRecordKeySet(generateInserts);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(generateInserts);
        arrayList.addAll(this.dataGen.generateUpdates(str, generateInserts));
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(arrayList, 1), str).collect());
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i3 = 0; i3 < strArr.length; i3++) {
            strArr[i3] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i3]);
        }
        Assertions.assertEquals(i2, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain " + i2 + " records");
        return Pair.of(recordsToRecordKeySet, generateInserts);
    }

    private void testDeletes(SparkRDDWriteClient sparkRDDWriteClient, List<HoodieRecord> list, int i, String str, String str2, int i2, List<String> list2) {
        sparkRDDWriteClient.startCommitWithTime(str2);
        List randomSelectAsHoodieKeys = Transformations.randomSelectAsHoodieKeys(list, i);
        List collect = sparkRDDWriteClient.delete(this.jsc.parallelize(randomSelectAsHoodieKeys, 1), str2).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        Assertions.assertEquals(1, collect.size(), "Just 1 file needs to be added.");
        Assertions.assertEquals(str, ((WriteStatus) collect.get(0)).getFileId(), "Existing file should be expanded");
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i3 = 0; i3 < strArr.length; i3++) {
            strArr[i3] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i3]);
        }
        Assertions.assertEquals(i2, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain " + i2 + " records");
        Path path = new Path(this.basePath, ((WriteStatus) collect.get(0)).getStat().getPath());
        Assertions.assertEquals(i2, BaseFileUtils.getInstance(this.metaClient).readRowKeys(this.hadoopConf, path).size(), "file should contain 110 records");
        Iterator it = BaseFileUtils.getInstance(this.metaClient).readAvroRecords(this.hadoopConf, path).iterator();
        while (it.hasNext()) {
            String obj = ((GenericRecord) it.next()).get("_hoodie_record_key").toString();
            Assertions.assertTrue(list2.contains(obj), "key expected to be part of " + str2);
            Assertions.assertFalse(randomSelectAsHoodieKeys.contains(obj), "Key deleted");
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testDeletesWithoutInserts(boolean z) {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), z, z ? new Properties() : getPropertiesForKeyGen());
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        String str = "001";
        hoodieWriteClient.startCommitWithTime("001");
        JavaRDD parallelize = this.jsc.parallelize(Transformations.randomSelectAsHoodieKeys(this.dataGen.generateInserts("001", 20), 20), 1);
        Assertions.assertThrows(HoodieIOException.class, () -> {
            hoodieWriteClient.delete(parallelize, str).collect();
        }, "Should have thrown Exception");
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testCommitWritesRelativePaths(boolean z) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        Throwable th = null;
        try {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
            HoodieSparkTable create = HoodieSparkTable.create(withAutoCommit.build(), this.context, build);
            hoodieWriteClient.startCommitWithTime("000");
            Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts("000", 200), 1), "000")), "Commit should succeed");
            Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
            HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) build.getCommitTimeline().filterCompletedInstants().getInstantDetails(new HoodieInstant(false, build.getCommitActionType(), "000")).get(), HoodieCommitMetadata.class);
            String basePath = create.getMetaClient().getBasePath();
            Collection values = hoodieCommitMetadata.getFileIdAndFullPaths(new Path(basePath)).values();
            FSDataInputStream open = this.fs.open(this.testTable.getCommitFilePath("000"));
            Throwable th2 = null;
            try {
                try {
                    Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromJsonString(FileIOUtils.readAsUTFString(open), HoodieCommitMetadata.class)).getFileIdAndFullPaths(new Path(basePath)).values().iterator();
                    while (it.hasNext()) {
                        Assertions.assertTrue(values.contains((String) it.next()));
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            open.close();
                        }
                    }
                    if (hoodieWriteClient != null) {
                        if (0 == 0) {
                            hoodieWriteClient.close();
                            return;
                        }
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (open != null) {
                    if (th2 != null) {
                        try {
                            open.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        open.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th8;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testMetadataStatsOnCommit(boolean z) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        hoodieWriteClient.startCommitWithTime("000");
        List generateInserts = this.dataGen.generateInserts("000", 200);
        Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), "000")), "Commit should succeed");
        Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
        FSDataInputStream open = this.fs.open(this.testTable.getCommitFilePath("000"));
        Throwable th = null;
        try {
            try {
                int i = 0;
                Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromJsonString(FileIOUtils.readAsUTFString(open), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
                    while (it2.hasNext()) {
                        i = (int) (i + ((HoodieWriteStat) it2.next()).getNumInserts());
                    }
                }
                Assertions.assertEquals(200, i);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                hoodieWriteClient.startCommitWithTime("001");
                Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("001", generateInserts), 1), "001")), "Commit should succeed");
                Assertions.assertTrue(this.testTable.commitExists("001"), "After explicit commit, commit file should be created");
                open = this.fs.open(this.testTable.getCommitFilePath("001"));
                Throwable th3 = null;
                try {
                    try {
                        int i2 = 0;
                        int i3 = 0;
                        Iterator it3 = ((HoodieCommitMetadata) HoodieCommitMetadata.fromJsonString(FileIOUtils.readAsUTFString(open), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                        while (it3.hasNext()) {
                            for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it3.next()).getValue()) {
                                i2 = (int) (i2 + hoodieWriteStat.getNumInserts());
                                i3 = (int) (i3 + hoodieWriteStat.getNumUpdateWrites());
                            }
                        }
                        Assertions.assertEquals(0, i2);
                        Assertions.assertEquals(200, i3);
                        if (open != null) {
                            if (0 == 0) {
                                open.close();
                                return;
                            }
                            try {
                                open.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testConsistencyCheckDuringFinalize(boolean z) throws Exception {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withEnableOptimisticConsistencyGuard(z).build()).build());
        Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck = testConsistencyCheck(build, "000", z);
        build.getFs().delete((Path) testConsistencyCheck.getKey(), false);
        if (z) {
            Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
            Assertions.assertFalse(build.getFs().exists(new Path(build.getMarkerFolderPath("000"))));
        } else {
            Assertions.assertTrue(hoodieWriteClient.commit("000", testConsistencyCheck.getRight()), "Commit should succeed");
            Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
            Assertions.assertFalse(build.getFs().exists(new Path(build.getMarkerFolderPath("000"))));
        }
    }

    private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean z, boolean z2, boolean z3) throws Exception {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        Properties properties = new Properties();
        if (!z3) {
            properties = getPropertiesForKeyGen();
        }
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(!z2 ? getConfigBuilder().withRollbackUsingMarkers(z).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(z2).build()).build() : getConfigBuilder().withRollbackUsingMarkers(z).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(z2).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).withProperties(properties).build());
        testConsistencyCheck(build, "00000000000010", z2);
        if (!z2) {
            hoodieWriteClient.rollback("00000000000010");
            Assertions.assertFalse(this.testTable.commitExists("00000000000010"), "After explicit rollback, commit file should not be present");
            Assertions.assertFalse(build.getFs().exists(new Path(build.getMarkerFolderPath("00000000000010"))));
            return;
        }
        Assertions.assertTrue(this.testTable.commitExists("00000000000010"), "With optimistic CG, first commit should succeed. commit file should be present");
        Assertions.assertFalse(build.getFs().exists(new Path(build.getMarkerFolderPath("00000000000010"))));
        if (!z) {
            hoodieWriteClient.rollback("00000000000010");
            Assertions.assertFalse(this.testTable.commitExists("00000000000010"), "After explicit rollback, commit file should not be present");
        } else {
            try {
                hoodieWriteClient.rollback("00000000000010");
                Assertions.fail("Rollback of completed commit should throw exception");
            } catch (HoodieRollbackException e) {
            }
        }
    }

    @MethodSource({"rollbackAfterConsistencyCheckFailureParams"})
    @ParameterizedTest
    public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean z, boolean z2) throws Exception {
        testRollbackAfterConsistencyCheckFailureUsingFileList(false, z, z2);
    }

    @MethodSource({"rollbackAfterConsistencyCheckFailureParams"})
    @ParameterizedTest
    public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean z, boolean z2) throws Exception {
        testRollbackAfterConsistencyCheckFailureUsingFileList(true, z, z2);
    }

    @MethodSource({"rollbackFailedCommitsParams"})
    @ParameterizedTest
    public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy, boolean z) throws Exception {
        HoodieTestUtils.init(this.hadoopConf, this.basePath);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        writeBatch(sparkRDDWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        writeBatch(sparkRDDWriteClient, "200", "100", of2, "100", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient.close();
        SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        writeBatch(sparkRDDWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient2.close();
        this.dataGen = new HoodieTestDataGenerator();
        SparkRDDWriteClient sparkRDDWriteClient3 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of4 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
        hoodieTestDataGenerator4.getClass();
        writeBatch(sparkRDDWriteClient3, "400", "300", of4, "400", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        Assertions.assertTrue(build.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 0);
        Assertions.assertTrue(build.getActiveTimeline().filterInflights().countInstants() == 2);
        Assertions.assertTrue(build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
        boolean z2 = false;
        while (!z2) {
            z2 = sparkRDDWriteClient3.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        SparkRDDWriteClient sparkRDDWriteClient4 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of5 = Option.of(Arrays.asList("500"));
        HoodieTestDataGenerator hoodieTestDataGenerator5 = this.dataGen;
        hoodieTestDataGenerator5.getClass();
        writeBatch(sparkRDDWriteClient4, "500", "400", of5, "500", 100, hoodieTestDataGenerator5::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true);
        sparkRDDWriteClient4.clean();
        HoodieActiveTimeline reload = build.getActiveTimeline().reload();
        if (hoodieFailedWritesCleaningPolicy.isLazy()) {
            Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 2);
            Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants() == 0);
            Assertions.assertTrue(reload.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
        } else if (hoodieFailedWritesCleaningPolicy.isNever()) {
            Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 0);
            Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants() == 0);
            Assertions.assertTrue(reload.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testRollbackFailedCommitsToggleCleaningPolicy(boolean z) throws Exception {
        HoodieTestUtils.init(this.hadoopConf, this.basePath);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy.EAGER, z));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        writeBatch(sparkRDDWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        writeBatch(sparkRDDWriteClient, "200", "100", of2, "200", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient.close();
        HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        writeBatch(sparkRDDWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient2.close();
        SparkRDDWriteClient sparkRDDWriteClient3 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of4 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
        hoodieTestDataGenerator4.getClass();
        writeBatch(sparkRDDWriteClient3, "400", "300", of4, "400", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient3.close();
        boolean z2 = false;
        while (!z2) {
            z2 = sparkRDDWriteClient3.getHeartbeatClient().isHeartbeatExpired("400");
            Thread.sleep(2000L);
        }
        sparkRDDWriteClient3.clean();
        Assertions.assertTrue(this.metaClient.getActiveTimeline().reload().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 3);
        SparkRDDWriteClient sparkRDDWriteClient4 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of5 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator5 = this.dataGen;
        hoodieTestDataGenerator5.getClass();
        writeBatch(sparkRDDWriteClient4, "500", "400", of5, "300", 100, hoodieTestDataGenerator5::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient4.close();
        SparkRDDWriteClient sparkRDDWriteClient5 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of6 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator6 = this.dataGen;
        hoodieTestDataGenerator6.getClass();
        writeBatch(sparkRDDWriteClient5, "600", "500", of6, "400", 100, hoodieTestDataGenerator6::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false);
        sparkRDDWriteClient5.close();
        new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy.EAGER, z)).startCommit();
        HoodieActiveTimeline reload = this.metaClient.getActiveTimeline().reload();
        Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 5);
        Assertions.assertTrue(reload.getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
    }

    @Test
    public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
        HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        HoodieTestUtils.init(this.hadoopConf, this.basePath);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        writeBatch(sparkRDDWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, true);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        writeBatch(sparkRDDWriteClient, "200", "100", of2, "200", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, false);
        sparkRDDWriteClient.close();
        SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        writeBatch(sparkRDDWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, false);
        sparkRDDWriteClient2.close();
        this.dataGen = new HoodieTestDataGenerator();
        newFixedThreadPool.submit(() -> {
            SparkRDDWriteClient sparkRDDWriteClient3 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true));
            Option<List<String>> of4 = Option.of(Arrays.asList("400"));
            HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
            hoodieTestDataGenerator4.getClass();
            return writeBatch(sparkRDDWriteClient3, "400", "300", of4, "300", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
                return v0.bulkInsert(v1, v2);
            }, false, 100, 100, 0, true);
        }).get();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build();
        Assertions.assertTrue(build.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 0);
        Assertions.assertTrue(build.getActiveTimeline().filterInflights().countInstants() == 2);
        Assertions.assertTrue(build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
        SparkRDDWriteClient sparkRDDWriteClient3 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true));
        boolean z = false;
        while (!z) {
            z = sparkRDDWriteClient3.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        Future submit = newFixedThreadPool.submit(() -> {
            SparkRDDWriteClient sparkRDDWriteClient4 = new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true));
            Option<List<String>> of4 = Option.of(Arrays.asList("500"));
            HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
            hoodieTestDataGenerator4.getClass();
            return writeBatch(sparkRDDWriteClient4, "500", "400", of4, "500", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
                return v0.bulkInsert(v1, v2);
            }, false, 100, 100, 0, true);
        });
        Future submit2 = newFixedThreadPool.submit(() -> {
            return new SparkRDDWriteClient(this.context, getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, true)).clean();
        });
        submit.get();
        submit2.get();
        HoodieActiveTimeline reload = build.getActiveTimeline().reload();
        Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants() == 2);
        Assertions.assertTrue(reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants() == 0);
        Assertions.assertTrue(reload.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
    }

    private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient hoodieTableMetaClient, String str, boolean z) throws Exception {
        HoodieWriteConfig build = !z ? getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(z).build()).build() : getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(z).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        hoodieWriteClient.startCommitWithTime(str);
        JavaRDD bulkInsert = hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts(str, 200), 1), str);
        bulkInsert.collect();
        String markerFolderPath = hoodieTableMetaClient.getMarkerFolderPath(str);
        Option create = WriteMarkersFactory.get(build.getMarkersType(), getHoodieTable(hoodieTableMetaClient, build), str).create(build.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED ? new Path(markerFolderPath, (String) MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(markerFolderPath, this.fs, this.context, 1).values().stream().flatMap((v0) -> {
            return v0.stream();
        }).findFirst().get()).getParent().toString() : (String) ((List) Arrays.stream(this.fs.globStatus(new Path(String.format("%s/*/*/*/*", markerFolderPath)), path -> {
            return path.toString().contains(".marker");
        })).limit(1L).map(fileStatus -> {
            return fileStatus.getPath().getParent().toString();
        }).collect(Collectors.toList())).get(0), FSUtils.makeBaseFileName(str, "1-0-1", UUID.randomUUID().toString()), IOType.MERGE);
        LOG.info("Created a dummy marker path=" + create.get());
        if (z) {
            hoodieWriteClient.commit(str, bulkInsert);
        } else {
            Assertions.assertTrue(((Exception) Assertions.assertThrows(HoodieCommitException.class, () -> {
                hoodieWriteClient.commit(str, bulkInsert);
            }, "Commit should fail due to consistency check")).getCause() instanceof HoodieIOException);
        }
        return Pair.of(create.get(), bulkInsert);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testMultiOperationsPerCommit(boolean z) throws IOException {
        HoodieWriteConfig.Builder withAllowMultiWriteOnSameInstant = getConfigBuilder().withAutoCommit(false).withAllowMultiWriteOnSameInstant(true);
        addConfigsForPopulateMetaFields(withAllowMultiWriteOnSameInstant, z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(withAllowMultiWriteOnSameInstant.build());
        hoodieWriteClient.startCommitWithTime("0000");
        Assertions.assertTrue(hoodieWriteClient.commit("0000", hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts("0000", 200), 1), "0000")), "Commit should succeed");
        Assertions.assertTrue(this.testTable.commitExists("0000"), "After explicit commit, commit file should be created");
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        Assertions.assertEquals(200, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain 200 records");
        hoodieWriteClient.startCommitWithTime("0001");
        Assertions.assertTrue(hoodieWriteClient.commit("0001", hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts("0001", 200), 1), "0001").union(hoodieWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("0001", 200), 1), "0001"))), "Commit should succeed");
        Assertions.assertTrue(this.testTable.commitExists("0000"), "After explicit commit, commit file should be created");
        int i2 = 2 * 200;
        Assertions.assertEquals(i2, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr).count(), "Must contain " + i2 + " records");
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i) {
        return getSmallInsertWriteConfig(i, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z) {
        return getSmallInsertWriteConfig(i, z, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z, boolean z2) {
        return getSmallInsertWriteConfig(i, z, this.dataGen.getEstimatedFileSizeInBytes(150), z2);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z, long j) {
        return getSmallInsertWriteConfig(i, z, j, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z, long j, boolean z2) {
        return getSmallInsertWriteConfig(i, z ? HoodieTestDataGenerator.NULL_SCHEMA : "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", j, z2);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j) {
        return getSmallInsertWriteConfig(i, str, j, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z) {
        return getSmallInsertWriteConfig(i, str, j, z, true, new Properties());
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z, Properties properties) {
        return getSmallInsertWriteConfig(i, str, j, false, z, properties);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z, boolean z2, Properties properties) {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(str);
        if (!z2) {
            configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
        }
        return configBuilder.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).insertSplitSize(i).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).parquetMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).build()).withMergeAllowDuplicateOnInserts(z).withProps(properties).build();
    }

    protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient hoodieTableMetaClient, String str, List<FileSlice>[] listArr) throws IOException {
        HoodieClusteringPlan createClusteringPlan = ClusteringUtils.createClusteringPlan((String) HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.defaultValue(), STRATEGY_PARAMS, listArr, Collections.emptyMap());
        HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "replacecommit", str);
        hoodieTableMetaClient.getActiveTimeline().saveToPendingReplaceCommit(hoodieInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(createClusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build()));
        return hoodieInstant;
    }

    private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy, boolean z) {
        return getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).withAutoClean(false).build()).withTimelineLayoutVersion(1).withHeartbeatIntervalInMs(3000).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withAutoCommit(false).withProperties(z ? new Properties() : getPropertiesForKeyGen()).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2070688381:
                if (implMethodName.equals("lambda$testUpsertsInternal$578638dc$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/table/HoodieTableMetaClient;Ljava/lang/String;Ljava/lang/String;Lorg/apache/hudi/config/HoodieWriteConfig;Lorg/apache/hudi/table/HoodieTable;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/Integer;)Ljava/lang/Boolean;")) {
                    HoodieTableMetaClient hoodieTableMetaClient = (HoodieTableMetaClient) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    String str2 = (String) serializedLambda.getCapturedArg(2);
                    HoodieWriteConfig hoodieWriteConfig = (HoodieWriteConfig) serializedLambda.getCapturedArg(3);
                    HoodieTable hoodieTable = (HoodieTable) serializedLambda.getCapturedArg(4);
                    HoodieWriteConfig hoodieWriteConfig2 = (HoodieWriteConfig) serializedLambda.getCapturedArg(5);
                    return num -> {
                        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTableMetaClient.getActiveTimeline().getInstantDetails((HoodieInstant) hoodieTableMetaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(), HoodieCommitMetadata.class);
                        String str22 = (String) hoodieCommitMetadata.getPartitionToWriteStats().values().stream().flatMap(list2 -> {
                            return list2.stream();
                        }).filter(hoodieWriteStat -> {
                            return hoodieWriteStat.getPath().endsWith(str);
                        }).findAny().map(hoodieWriteStat2 -> {
                            return hoodieWriteStat2.getPath();
                        }).orElse(null);
                        String str3 = (String) hoodieCommitMetadata.getPartitionToWriteStats().values().stream().flatMap(list3 -> {
                            return list3.stream();
                        }).filter(hoodieWriteStat3 -> {
                            return hoodieWriteStat3.getPath().endsWith(str);
                        }).findAny().map(hoodieWriteStat4 -> {
                            return hoodieWriteStat4.getPartitionPath();
                        }).orElse(null);
                        Path path = new Path(str2, str22);
                        HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(path.toString());
                        try {
                            HoodieMergeHandle hoodieMergeHandle = new HoodieMergeHandle(hoodieWriteConfig, "007", hoodieTable, new HashMap(), str3, FSUtils.getFileId(path.getName()), hoodieBaseFile, new SparkTaskContextSupplier(), hoodieWriteConfig2.populateMetaFields() ? Option.empty() : Option.of(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieWriteConfig2.getProps()))));
                            WriteStatus writeStatus = new WriteStatus(false, Double.valueOf(0.0d));
                            writeStatus.setStat(new HoodieWriteStat());
                            writeStatus.getStat().setNumWrites(0L);
                            hoodieMergeHandle.performMergeDataValidationCheck(writeStatus);
                        } catch (HoodieCorruptedDataException e) {
                            Assertions.fail("Exception not expected because merge validation check is disabled");
                        }
                        try {
                            hoodieWriteConfig.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
                            HoodieMergeHandle hoodieMergeHandle2 = new HoodieMergeHandle(HoodieWriteConfig.newBuilder().withProps(hoodieWriteConfig.getProps()).build(), "006", hoodieTable, new HashMap(), str3, FSUtils.getFileId(path.getName()), hoodieBaseFile, new SparkTaskContextSupplier(), hoodieWriteConfig2.populateMetaFields() ? Option.empty() : Option.of(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieWriteConfig2.getProps()))));
                            WriteStatus writeStatus2 = new WriteStatus(false, Double.valueOf(0.0d));
                            writeStatus2.setStat(new HoodieWriteStat());
                            writeStatus2.getStat().setNumWrites(0L);
                            hoodieMergeHandle2.performMergeDataValidationCheck(writeStatus2);
                            Assertions.fail("The above line should have thrown an exception");
                        } catch (HoodieCorruptedDataException e2) {
                        }
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
