package org.apache.hudi.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.class */
public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctionalTestHarness {
    public SparkConf conf() {
        return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
    }

    private static Stream<Arguments> getTableTypeAndIndexType() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, HoodieIndex.IndexType.GLOBAL_SIMPLE}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, HoodieIndex.IndexType.GLOBAL_BLOOM}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, HoodieIndex.IndexType.RECORD_INDEX}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, HoodieIndex.IndexType.GLOBAL_SIMPLE}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, HoodieIndex.IndexType.GLOBAL_BLOOM}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, HoodieIndex.IndexType.RECORD_INDEX})});
    }

    @MethodSource({"getTableTypeAndIndexType"})
    @ParameterizedTest
    public void testPartitionChanges(HoodieTableType hoodieTableType, HoodieIndex.IndexType indexType) throws IOException {
        HoodieWriteConfig writeConfig = getWriteConfig(DefaultHoodieRecordPayload.class, indexType);
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(hoodieTableType, writeConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            try {
                String commitTimeAtUTC = HoodieTestDataGenerator.getCommitTimeAtUTC(0L);
                List inserts = HoodieAdaptablePayloadDataGenerator.getInserts(4, "p1", 0L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(inserts, 2), commitTimeAtUTC).collect());
                String commitTimeAtUTC2 = HoodieTestDataGenerator.getCommitTimeAtUTC(5L);
                List updates = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 5L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC2);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates, 2), commitTimeAtUTC2).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p1", 5L);
                String commitTimeAtUTC3 = HoodieTestDataGenerator.getCommitTimeAtUTC(6L);
                List updates2 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates, "p2", 6L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC3);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates2, 2), commitTimeAtUTC3).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p2", 6L);
                String commitTimeAtUTC4 = HoodieTestDataGenerator.getCommitTimeAtUTC(7L);
                List updates3 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates2, "p3", 7L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC4);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates3, 2), commitTimeAtUTC4).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p3", 7L);
                String commitTimeAtUTC5 = HoodieTestDataGenerator.getCommitTimeAtUTC(8L);
                List updates4 = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, "p4", 2L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC5);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates4, 2), commitTimeAtUTC5).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p3", 7L);
                String commitTimeAtUTC6 = HoodieTestDataGenerator.getCommitTimeAtUTC(9L);
                List updates5 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates3, "p1", 9L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC6);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates5, 2), commitTimeAtUTC6).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p1", 9L);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"getTableTypeAndIndexType"})
    @ParameterizedTest
    public void testUpdatePartitionsThenDelete(HoodieTableType hoodieTableType, HoodieIndex.IndexType indexType) throws IOException {
        HoodieWriteConfig writeConfig = getWriteConfig(DefaultHoodieRecordPayload.class, indexType);
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(hoodieTableType, writeConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            try {
                String commitTimeAtUTC = HoodieTestDataGenerator.getCommitTimeAtUTC(0L);
                List inserts = HoodieAdaptablePayloadDataGenerator.getInserts(4, "p1", 0L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(inserts, 2), commitTimeAtUTC).collect());
                String commitTimeAtUTC2 = HoodieTestDataGenerator.getCommitTimeAtUTC(5L);
                List updates = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 5L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC2);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates, 2), commitTimeAtUTC2).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p1", 5L);
                String commitTimeAtUTC3 = HoodieTestDataGenerator.getCommitTimeAtUTC(6L);
                List updates2 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates, "p2", 6L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC3);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates2, 2), commitTimeAtUTC3).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p2", 6L);
                String commitTimeAtUTC4 = HoodieTestDataGenerator.getCommitTimeAtUTC(7L);
                List deletesWithNewPartition = HoodieAdaptablePayloadDataGenerator.getDeletesWithNewPartition(inserts.subList(0, 2), "p2", 7L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC4);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(deletesWithNewPartition, 2), commitTimeAtUTC4).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{2, 3}, "p2", 6L);
                String commitTimeAtUTC5 = HoodieTestDataGenerator.getCommitTimeAtUTC(8L);
                List deletesWithEmptyPayloadAndNewPartition = HoodieAdaptablePayloadDataGenerator.getDeletesWithEmptyPayloadAndNewPartition(inserts.subList(2, 3), "unknown_pt");
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC5);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(deletesWithEmptyPayloadAndNewPartition, 1), commitTimeAtUTC5).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{3}, "p2", 6L);
                String commitTimeAtUTC6 = HoodieTestDataGenerator.getCommitTimeAtUTC(9L);
                List updates3 = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, "p1", 9L, DefaultHoodieRecordPayload.class);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC6);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates3, 2), commitTimeAtUTC6).collect());
                readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p1", 9L);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"getTableTypeAndIndexType"})
    @ParameterizedTest
    public void testUdpateSubsetOfRecUpdates(HoodieTableType hoodieTableType, HoodieIndex.IndexType indexType) throws IOException {
        HoodieWriteConfig writeConfig = getWriteConfig(DefaultHoodieRecordPayload.class, indexType);
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(hoodieTableType, writeConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            List inserts = HoodieAdaptablePayloadDataGenerator.getInserts(4, "p1", 0L, DefaultHoodieRecordPayload.class);
            String commitTimeAtUTC = HoodieTestDataGenerator.getCommitTimeAtUTC(0L);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(inserts.subList(0, 2), 2), commitTimeAtUTC).collect());
            readTableAndValidate(hoodieMetaClient, new int[]{0, 1}, "p1", 0L);
            String commitTimeAtUTC2 = HoodieTestDataGenerator.getCommitTimeAtUTC(5L);
            List updates = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts.subList(0, 3), 5L, DefaultHoodieRecordPayload.class);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC2);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates, 2), commitTimeAtUTC2).collect());
            readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2}, "p1", getExpectedTsMap(new int[]{0, 1, 2}, new Long[]{5L, 5L, 5L}));
            String commitTimeAtUTC3 = HoodieTestDataGenerator.getCommitTimeAtUTC(10L);
            List updates2 = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 10L, DefaultHoodieRecordPayload.class);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC3);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates2, 2), commitTimeAtUTC3).collect());
            readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p1", getExpectedTsMap(new int[]{0, 1, 2, 3}, new Long[]{10L, 10L, 10L, 10L}));
            String commitTimeAtUTC4 = HoodieTestDataGenerator.getCommitTimeAtUTC(20L);
            List updates3 = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, "p2", 20L, DefaultHoodieRecordPayload.class);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC4);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates3, 2), commitTimeAtUTC4).collect());
            readTableAndValidate(hoodieMetaClient, new int[]{0, 1, 2, 3}, "p2", 20L);
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private Map<String, Long> getExpectedTsMap(int[] iArr, Long[] lArr) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < iArr.length; i++) {
            hashMap.put(String.valueOf(iArr[i]), lArr[i]);
        }
        return hashMap;
    }

    private void readTableAndValidate(HoodieTableMetaClient hoodieTableMetaClient, int[] iArr, String str, long j) {
        HashMap hashMap = new HashMap();
        Arrays.stream(iArr).forEach(i -> {
        });
        readTableAndValidate(hoodieTableMetaClient, iArr, str, hashMap);
    }

    private void readTableAndValidate(HoodieTableMetaClient hoodieTableMetaClient, int[] iArr, String str, Map<String, Long> map) {
        Dataset cache = spark().read().format("hudi").load(hoodieTableMetaClient.getBasePath().toString()).sort("id", new String[0]).select("_hoodie_record_key", new String[]{"_hoodie_partition_path", "id", "pt", "ts"}).cache();
        int length = iArr.length;
        org.junit.jupiter.api.Assertions.assertEquals(length, cache.count());
        org.junit.jupiter.api.Assertions.assertEquals(length, cache.filter(String.format("pt = '%s'", str)).count());
        Row[] rowArr = (Row[]) cache.collect();
        for (int i = 0; i < length; i++) {
            int i2 = iArr[i];
            Row row = rowArr[i];
            org.junit.jupiter.api.Assertions.assertEquals(String.valueOf(i2), row.getString(0));
            org.junit.jupiter.api.Assertions.assertEquals(str, row.getString(1));
            org.junit.jupiter.api.Assertions.assertEquals(i2, row.getInt(2));
            org.junit.jupiter.api.Assertions.assertEquals(str, row.getString(3));
            org.junit.jupiter.api.Assertions.assertEquals(map.get(String.valueOf(i2)), row.getLong(4));
        }
        cache.unpersist();
    }

    private HoodieWriteConfig getWriteConfig(Class<?> cls, HoodieIndex.IndexType indexType) {
        HoodieMetadataConfig.Builder newBuilder = HoodieMetadataConfig.newBuilder();
        if (indexType == HoodieIndex.IndexType.RECORD_INDEX) {
            newBuilder.enable(true).withEnableRecordIndex(true);
        } else {
            newBuilder.enable(false);
        }
        return getConfigBuilder(true).withProperties(HoodieAdaptablePayloadDataGenerator.getKeyGenProps(cls)).withParallelism(2, 2).withBulkInsertParallelism(2).withDeleteParallelism(2).withMetadataConfig(newBuilder.build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).bloomIndexParallelism(2).withSimpleIndexParallelism(2).withGlobalSimpleIndexParallelism(2).withGlobalIndexReconcileParallelism(2).withGlobalBloomIndexUpdatePartitionPath(true).withGlobalSimpleIndexUpdatePartitionPath(true).withRecordIndexUpdatePartitionPath(true).build()).withSchema(HoodieAdaptablePayloadDataGenerator.SCHEMA_STR).withPayloadConfig(HoodiePayloadConfig.newBuilder().fromProperties(HoodieAdaptablePayloadDataGenerator.getPayloadProps(cls)).build()).build();
    }
}
