package org.apache.hudi.io;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/io/TestHoodieMergeHandle.class */
public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness {

    /* loaded from: input_file:org/apache/hudi/io/TestHoodieMergeHandle$TestWriteStatus.class */
    public static class TestWriteStatus extends WriteStatus {
        public TestWriteStatus(Boolean bool, Double d) {
            super(true, d);
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        initSparkContexts();
        initPath();
        initFileSystem();
        initTestDataGenerator();
        initMetaClient();
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMapType diskMapType, boolean z) throws Exception {
        this.dataGen = new HoodieTestDataGenerator(new String[]{HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]});
        Properties properties = new Properties();
        properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
        properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(z));
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withProperties(properties).build());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 4);
                HoodieRecord hoodieRecord = (HoodieRecord) generateInserts.get(0);
                HoodieRecord hoodieRecord2 = (HoodieRecord) generateInserts.get(1);
                for (int i = 0; i < 20; i++) {
                    generateInserts.add(this.dataGen.generateUpdateRecord(hoodieRecord.getKey(), "001"));
                }
                for (int i2 = 0; i2 < 20; i2++) {
                    generateInserts.add(this.dataGen.generateUpdateRecord(hoodieRecord2.getKey(), "001"));
                }
                Assertions.assertNoWriteErrors(hoodieWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), "001").collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieTimeline commitTimeline = new HoodieActiveTimeline(this.metaClient).getCommitTimeline();
                org.junit.jupiter.api.Assertions.assertEquals(1, commitTimeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
                org.junit.jupiter.api.Assertions.assertEquals("001", ((HoodieInstant) commitTimeline.lastInstant().get()).getTimestamp(), "Latest commit should be 001");
                org.junit.jupiter.api.Assertions.assertEquals(generateInserts.size(), HoodieClientTestUtils.readCommit(this.basePath, this.sqlContext, commitTimeline, "001").count(), "Must contain 44 records");
                hoodieWriteClient.startCommitWithTime("002");
                ArrayList arrayList = new ArrayList();
                arrayList.add(this.dataGen.generateUpdateRecord(hoodieRecord.getKey(), "002"));
                Assertions.assertNoWriteErrors(hoodieWriteClient.bulkInsert(this.jsc.parallelize(arrayList, 1), "002").collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieTimeline commitTimeline2 = new HoodieActiveTimeline(this.metaClient).getCommitTimeline();
                org.junit.jupiter.api.Assertions.assertEquals(2, commitTimeline2.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting two commits.");
                org.junit.jupiter.api.Assertions.assertEquals("002", ((HoodieInstant) commitTimeline2.lastInstant().get()).getTimestamp(), "Latest commit should be 002");
                org.junit.jupiter.api.Assertions.assertEquals(45L, getRecords().count(), "Must contain 45 records");
                hoodieWriteClient.startCommitWithTime("003");
                Assertions.assertNoWriteErrors(hoodieWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts("003", 2), 1), "003").collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieTimeline commitTimeline3 = new HoodieActiveTimeline(this.metaClient).getCommitTimeline();
                org.junit.jupiter.api.Assertions.assertEquals(3, commitTimeline3.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting three commits.");
                org.junit.jupiter.api.Assertions.assertEquals("003", ((HoodieInstant) commitTimeline3.lastInstant().get()).getTimestamp(), "Latest commit should be 003");
                org.junit.jupiter.api.Assertions.assertEquals(47L, getRecords().count(), "Must contain 47 records");
                hoodieWriteClient.startCommitWithTime("004");
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(this.dataGen.generateUpdateRecord(hoodieRecord.getKey(), "004"));
                arrayList2.add(this.dataGen.generateUpdateRecord(hoodieRecord2.getKey(), "004"));
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(arrayList2, 1), "004").collect());
                HoodieTimeline commitTimeline4 = new HoodieActiveTimeline(this.metaClient).getCommitTimeline();
                org.junit.jupiter.api.Assertions.assertEquals(4, commitTimeline4.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting four commits.");
                org.junit.jupiter.api.Assertions.assertEquals(((HoodieInstant) commitTimeline4.lastInstant().get()).getTimestamp(), "004", "Latest commit should be 004");
                Dataset<Row> records = getRecords();
                org.junit.jupiter.api.Assertions.assertEquals(47L, records.count(), "Must contain 47 records");
                int i3 = 0;
                int i4 = 0;
                for (Row row : (Row[]) records.collect()) {
                    if (row.getAs("_hoodie_record_key").equals(hoodieRecord.getKey().getRecordKey())) {
                        i3++;
                        org.junit.jupiter.api.Assertions.assertEquals(row.getAs("rider"), "rider-004");
                        org.junit.jupiter.api.Assertions.assertEquals(row.getAs("driver"), "driver-004");
                    } else if (row.getAs("_hoodie_record_key").equals(hoodieRecord2.getKey().getRecordKey())) {
                        i4++;
                        org.junit.jupiter.api.Assertions.assertEquals(row.getAs("rider"), "rider-004");
                        org.junit.jupiter.api.Assertions.assertEquals(row.getAs("driver"), "driver-004");
                    } else {
                        org.junit.jupiter.api.Assertions.assertNotEquals(row.getAs("rider"), "rider-004");
                        org.junit.jupiter.api.Assertions.assertNotEquals(row.getAs("driver"), "rider-004");
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(22, i3);
                org.junit.jupiter.api.Assertions.assertEquals(21, i4);
                org.junit.jupiter.api.Assertions.assertEquals((Set) getHoodieTableFileSystemView(this.metaClient, this.metaClient.getActiveTimeline(), HoodieTestTable.of(this.metaClient).listAllBaseFiles()).getLatestBaseFiles().map((v0) -> {
                    return v0.getFileName();
                }).collect(Collectors.toSet()), (Set) records.collectAsList().stream().map(row2 -> {
                    return row2.getAs(HoodieRecord.FILENAME_METADATA_FIELD);
                }).collect(Collectors.toSet()));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testHoodieMergeHandleWriteStatMetrics(ExternalSpillableMap.DiskMapType diskMapType, boolean z) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
        properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(z));
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withProperties(properties).build());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                List generateInserts = this.dataGen.generateInserts("100", 100);
                List collect = hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "100").collect();
                org.junit.jupiter.api.Assertions.assertTrue(collect.stream().filter(writeStatus -> {
                    return writeStatus.getStat().getPrevCommit() != "null";
                }).count() > 0);
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect.stream().map(writeStatus2 -> {
                    return Long.valueOf(writeStatus2.getStat().getNumWrites());
                }).reduce((l, l2) -> {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(0L, ((Long) collect.stream().map(writeStatus3 -> {
                    return Long.valueOf(writeStatus3.getStat().getNumUpdateWrites());
                }).reduce((l3, l4) -> {
                    return Long.valueOf(l3.longValue() + l4.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect.stream().map(writeStatus4 -> {
                    return Long.valueOf(writeStatus4.getStat().getNumInserts());
                }).reduce((l5, l6) -> {
                    return Long.valueOf(l5.longValue() + l6.longValue());
                }).get()).longValue());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                hoodieWriteClient.startCommitWithTime("101");
                List generateUpdates = this.dataGen.generateUpdates("101", generateInserts);
                List collect2 = hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates, 1), "101").collect();
                org.junit.jupiter.api.Assertions.assertEquals(0L, collect2.stream().filter(writeStatus5 -> {
                    return writeStatus5.getStat().getPrevCommit() == "null";
                }).count());
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect2.stream().map(writeStatus6 -> {
                    return Long.valueOf(writeStatus6.getStat().getNumWrites());
                }).reduce((l7, l8) -> {
                    return Long.valueOf(l7.longValue() + l8.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect2.stream().map(writeStatus7 -> {
                    return Long.valueOf(writeStatus7.getStat().getNumUpdateWrites());
                }).reduce((l9, l10) -> {
                    return Long.valueOf(l9.longValue() + l10.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(0L, ((Long) collect2.stream().map(writeStatus8 -> {
                    return Long.valueOf(writeStatus8.getStat().getNumInserts());
                }).reduce((l11, l12) -> {
                    return Long.valueOf(l11.longValue() + l12.longValue());
                }).get()).longValue());
                hoodieWriteClient.startCommitWithTime("102");
                List generateInserts2 = this.dataGen.generateInserts("102", 100);
                generateInserts2.addAll(generateUpdates);
                List collect3 = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts2, 1), "102").collect();
                org.junit.jupiter.api.Assertions.assertEquals(0L, collect3.stream().filter(writeStatus9 -> {
                    return writeStatus9.getStat().getPrevCommit() == "null";
                }).count());
                org.junit.jupiter.api.Assertions.assertEquals(200L, ((Long) collect3.stream().map(writeStatus10 -> {
                    return Long.valueOf(writeStatus10.getStat().getNumWrites());
                }).reduce((l13, l14) -> {
                    return Long.valueOf(l13.longValue() + l14.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect3.stream().map(writeStatus11 -> {
                    return Long.valueOf(writeStatus11.getStat().getNumUpdateWrites());
                }).reduce((l15, l16) -> {
                    return Long.valueOf(l15.longValue() + l16.longValue());
                }).get()).longValue());
                org.junit.jupiter.api.Assertions.assertEquals(100L, ((Long) collect3.stream().map(writeStatus12 -> {
                    return Long.valueOf(writeStatus12.getStat().getNumInserts());
                }).reduce((l17, l18) -> {
                    return Long.valueOf(l17.longValue() + l18.longValue());
                }).get()).longValue());
                collect3.forEach(writeStatus13 -> {
                    writeStatus13.getWrittenRecordDelegates().forEach(hoodieRecordDelegate -> {
                        org.junit.jupiter.api.Assertions.assertTrue(hoodieRecordDelegate.getNewLocation().isPresent());
                    });
                });
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private Dataset<Row> getRecords() {
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = Paths.get(this.basePath, this.dataGen.getPartitionPaths()[i], "*").toString();
        }
        return HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.fs, strArr);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class);
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true})});
    }
}
