package org.apache.hudi.table;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/table/TestHoodieMergeOnReadTable.class */
public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness {
    private HoodieTableMetaClient metaClient;
    private HoodieTestDataGenerator dataGen;

    void setUp(Properties properties) throws IOException {
        Properties copy = CollectionUtils.copy(properties);
        copy.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, copy);
        this.dataGen = new HoodieTestDataGenerator();
    }

    @Test
    public void testMetadataAggregateFromWriteStatus() throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
        setUp(build.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("001", 200);
                JavaRDD parallelize = jsc().parallelize(generateInserts, 1);
                hoodieWriteClient.startCommitWithTime("001");
                List collect = hoodieWriteClient.upsert(parallelize, "001").collect();
                Assertions.assertNoWriteErrors(collect);
                Map mergeMetadataForWriteStatuses = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(collect);
                org.junit.jupiter.api.Assertions.assertTrue(mergeMetadataForWriteStatuses.containsKey("InputRecordCount_1506582000"));
                org.junit.jupiter.api.Assertions.assertEquals(String.valueOf(2 * generateInserts.size()), mergeMetadataForWriteStatuses.get("InputRecordCount_1506582000"));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testUpsertPartitioner(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        setUp(build.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 20);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect());
                HoodieSparkTable create = HoodieSparkTable.create(build, m43context(), this.metaClient);
                Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
                org.junit.jupiter.api.Assertions.assertTrue(firstInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
                org.junit.jupiter.api.Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                Map map = (Map) getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().collect(Collectors.toMap((v0) -> {
                    return v0.getFileId();
                }, (v0) -> {
                    return v0.getFileSize();
                }));
                org.junit.jupiter.api.Assertions.assertTrue(((List) getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().collect(Collectors.toList())).size() > 0, "Should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("002");
                List generateUpdates = this.dataGen.generateUpdates("002", generateInserts);
                generateUpdates.addAll(this.dataGen.generateInserts("002", 20));
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(generateUpdates), "002").collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                Option lastInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
                org.junit.jupiter.api.Assertions.assertTrue(lastInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertEquals("002", ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should be 002");
                org.junit.jupiter.api.Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(this.metaClient, create.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), listAllBaseFilesInPath(create));
                org.junit.jupiter.api.Assertions.assertTrue(((Map) ((List) hoodieTableFileSystemView.getLatestBaseFiles().collect(Collectors.toList())).stream().collect(Collectors.toMap((v0) -> {
                    return v0.getFileId();
                }, (v0) -> {
                    return v0.getFileSize();
                }))).entrySet().stream().anyMatch(entry -> {
                    return ((Long) map.get(entry.getKey())).longValue() < ((Long) entry.getValue()).longValue();
                }));
                org.junit.jupiter.api.Assertions.assertEquals(40, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile -> {
                    return new Path(hoodieBaseFile.getPath()).getParent().toString();
                }).collect(Collectors.toList()), basePath(), new JobConf(hadoopConf()), true, false).size(), "Must contain 40 records");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testLogFileCountsAfterCompaction(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM, 1073741824L, HoodieClusteringConfig.newBuilder().build(), z);
        addConfigsForPopulateMetaFields(configBuilder, true);
        HoodieWriteConfig build = configBuilder.build();
        setUp(build.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("100");
            List generateInserts = this.dataGen.generateInserts("100", 100);
            hoodieWriteClient.insert(jsc().parallelize(generateInserts, 1), "100").collect();
            List generateUpdates = this.dataGen.generateUpdates("101", generateInserts);
            JavaRDD tagLocation = new SparkRDDReadClient(m43context(), build).tagLocation(jsc().parallelize(generateUpdates, 1));
            hoodieWriteClient.startCommitWithTime("101");
            hoodieWriteClient.upsertPreppedRecords(tagLocation, "101").collect();
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            org.junit.jupiter.api.Assertions.assertEquals(((Map) generateUpdates.stream().map(hoodieRecord -> {
                return hoodieRecord.getPartitionPath();
            }).collect(Collectors.groupingBy(str -> {
                return str;
            }))).keySet().size(), HoodieSparkWriteableTestTable.of(this.metaClient, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, SparkHoodieBackedTableMetadataWriter.create(hoodieWriteClient.getEngineContext().getHadoopConf().get(), build, hoodieWriteClient.getEngineContext())).listAllBaseFiles().length);
            HoodieSparkTable create = HoodieSparkTable.create(build, m43context(), this.metaClient);
            for (String str2 : this.dataGen.getPartitionPaths()) {
                for (FileSlice fileSlice : (List) create.getSliceView().getLatestFileSlices(str2).collect(Collectors.toList())) {
                    org.junit.jupiter.api.Assertions.assertEquals(1L, fileSlice.getLogFiles().count(), "There should be 1 log file written for the latest data file - " + fileSlice);
                }
            }
            String obj = hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString();
            HoodieWriteMetadata compact = hoodieWriteClient.compact(obj);
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            HoodieSparkTable create2 = HoodieSparkTable.create(build, m43context(), this.metaClient);
            org.junit.jupiter.api.Assertions.assertTrue(HoodieTimeline.compareTimestamps(((HoodieInstant) this.metaClient.getActiveTimeline().lastInstant().get()).getTimestamp(), HoodieTimeline.GREATER_THAN, "101"), "Compaction commit should be > than last insert");
            for (String str3 : this.dataGen.getPartitionPaths()) {
                Iterator it = ((List) create2.getSliceView().getLatestFileSlices(str3).collect(Collectors.toList())).iterator();
                while (it.hasNext()) {
                    org.junit.jupiter.api.Assertions.assertEquals(0L, ((FileSlice) it.next()).getLogFiles().count(), "After compaction there should be no log files visible on a full view");
                }
                org.junit.jupiter.api.Assertions.assertTrue(((HoodieCommitMetadata) compact.getCommitMetadata().get()).getWritePartitionPaths().stream().anyMatch(str4 -> {
                    return str4.contentEquals(str3);
                }));
            }
            String[] strArr = new String[this.dataGen.getPartitionPaths().length];
            for (int i = 0; i < strArr.length; i++) {
                strArr[i] = String.format("%s/%s/*", basePath(), this.dataGen.getPartitionPaths()[i]);
            }
            List collectAsList = HoodieClientTestUtils.read(jsc(), basePath(), sqlContext(), fs(), strArr).collectAsList();
            org.junit.jupiter.api.Assertions.assertEquals(generateUpdates.size(), collectAsList.size());
            Iterator it2 = collectAsList.iterator();
            while (it2.hasNext()) {
                org.junit.jupiter.api.Assertions.assertEquals(((Row) it2.next()).getAs("_hoodie_commit_time"), z ? "101" : obj);
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataStatsOnCommit(Boolean bool) throws Exception {
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, bool, HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).build();
        setUp(build.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HoodieSparkTable create = HoodieSparkTable.create(build, m43context(), this.metaClient);
                HoodieActiveTimeline activeTimeline = create.getActiveTimeline();
                String commitActionType = create.getMetaClient().getCommitActionType();
                HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, "000");
                activeTimeline.createNewInstant(hoodieInstant);
                activeTimeline.transitionRequestedToInflight(hoodieInstant, Option.empty());
                activeTimeline.saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActionType, "000"), Option.empty());
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 200);
                org.junit.jupiter.api.Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.insert(jsc().parallelize(generateInserts, 1), "001")), "Commit should succeed");
                HoodieSparkTable create2 = HoodieSparkTable.create(build, m43context());
                int i = 0;
                Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create2.getActiveTimeline().getInstantDetails((HoodieInstant) create2.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
                    while (it2.hasNext()) {
                        i = (int) (i + ((HoodieWriteStat) it2.next()).getNumInserts());
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(200, i);
                hoodieWriteClient.startCommitWithTime("002");
                int i2 = 0;
                int i3 = 0;
                for (WriteStatus writeStatus : hoodieWriteClient.upsert(jsc().parallelize(this.dataGen.generateUpdates("002", generateInserts), 1), "002").collect()) {
                    i2 = (int) (i2 + writeStatus.getStat().getNumInserts());
                    i3 = (int) (i3 + writeStatus.getStat().getNumUpdateWrites());
                }
                org.junit.jupiter.api.Assertions.assertEquals(0, i2);
                org.junit.jupiter.api.Assertions.assertEquals(200, i3);
                hoodieWriteClient.rollback("002");
                HoodieSparkTable create3 = HoodieSparkTable.create(build, m43context());
                int i4 = 0;
                int i5 = 0;
                Iterator it3 = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create3.getActiveTimeline().getInstantDetails((HoodieInstant) create3.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it3.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it3.next()).getValue()) {
                        i4 = (int) (i4 + hoodieWriteStat.getNumInserts());
                        i5 = (int) (i5 + hoodieWriteStat.getNumUpdateWrites());
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(200, i4);
                org.junit.jupiter.api.Assertions.assertEquals(0, i5);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRollingStatsWithSmallFileHandling() throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).build();
        setUp(build.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                hoodieWriteClient.startCommitWithTime("000");
                List generateInserts = this.dataGen.generateInserts("000", 200);
                org.junit.jupiter.api.Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.insert(jsc().parallelize(generateInserts, 1), "000")), "Commit should succeed");
                HoodieSparkTable create = HoodieSparkTable.create(build, m43context());
                int i = 0;
                Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create.getActiveTimeline().getInstantDetails((HoodieInstant) create.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it.next()).getValue()) {
                        i = (int) (i + hoodieWriteStat.getNumInserts());
                        hashMap.put(hoodieWriteStat.getFileId(), Long.valueOf(hoodieWriteStat.getNumInserts()));
                        hashMap2.put(hoodieWriteStat.getFileId(), Long.valueOf(hoodieWriteStat.getNumUpdateWrites()));
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(200, i);
                hoodieWriteClient.startCommitWithTime("001");
                List generateUpdates = this.dataGen.generateUpdates("001", generateInserts);
                generateUpdates.addAll(this.dataGen.generateInserts("001", 200));
                org.junit.jupiter.api.Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.upsert(jsc().parallelize(generateUpdates, 1), "001")), "Commit should succeed");
                HoodieSparkTable create2 = HoodieSparkTable.create(build, m43context());
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create2.getActiveTimeline().getInstantDetails((HoodieInstant) create2.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class);
                int i2 = 0;
                int i3 = 0;
                Iterator it2 = hoodieCommitMetadata.getPartitionToWriteStats().entrySet().iterator();
                while (it2.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat2 : (List) ((Map.Entry) it2.next()).getValue()) {
                        org.junit.jupiter.api.Assertions.assertTrue(hashMap.containsKey(hoodieWriteStat2.getFileId()));
                        org.junit.jupiter.api.Assertions.assertTrue(hashMap2.containsKey(hoodieWriteStat2.getFileId()));
                        i2 = (int) (i2 + hoodieWriteStat2.getNumInserts());
                        i3 = (int) (i3 + hoodieWriteStat2.getNumUpdateWrites());
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(200, i2);
                org.junit.jupiter.api.Assertions.assertEquals(200, i3);
                hoodieWriteClient.scheduleCompactionAtInstant("002", Option.of(hoodieCommitMetadata.getExtraMetadata()));
                HoodieWriteMetadata compact = hoodieWriteClient.compact("002");
                hoodieWriteClient.commitCompaction("002", (HoodieCommitMetadata) compact.getCommitMetadata().get(), Option.empty());
                HoodieSparkTable create3 = HoodieSparkTable.create(build, m43context());
                HoodieCommitMetadata hoodieCommitMetadata2 = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create3.getActiveTimeline().getInstantDetails((HoodieInstant) create3.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class);
                for (Map.Entry entry : hoodieCommitMetadata.getPartitionToWriteStats().entrySet()) {
                    org.junit.jupiter.api.Assertions.assertTrue(hoodieCommitMetadata2.getPartitionToWriteStats().containsKey(entry.getKey()));
                    org.junit.jupiter.api.Assertions.assertEquals(((List) hoodieCommitMetadata2.getPartitionToWriteStats().get(entry.getKey())).size(), ((List) entry.getValue()).size());
                }
                hoodieWriteClient.startCommitWithTime("003");
                List generateUpdates2 = this.dataGen.generateUpdates("003", generateUpdates);
                generateUpdates2.addAll(this.dataGen.generateInserts("003", 200));
                org.junit.jupiter.api.Assertions.assertTrue(hoodieWriteClient.commit("003", hoodieWriteClient.upsert(jsc().parallelize(generateUpdates2, 1), "003")), "Commit should succeed");
                HoodieSparkTable create4 = HoodieSparkTable.create(build, m43context());
                int i4 = 0;
                int i5 = 0;
                Iterator it3 = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create4.getActiveTimeline().getInstantDetails((HoodieInstant) create4.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it3.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat3 : (List) ((Map.Entry) it3.next()).getValue()) {
                        org.junit.jupiter.api.Assertions.assertTrue(hashMap.containsKey(hoodieWriteStat3.getFileId()));
                        i4 = (int) (i4 + hoodieWriteStat3.getNumInserts());
                        i5 = (int) (i5 + hoodieWriteStat3.getNumUpdateWrites());
                    }
                }
                org.junit.jupiter.api.Assertions.assertEquals(200, i4);
                org.junit.jupiter.api.Assertions.assertEquals(400, i5);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHandleUpdateWithMultiplePartitions() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        setUp(config.getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 20);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect());
                HoodieSparkMergeOnReadTable create = HoodieSparkTable.create(config, m43context(), this.metaClient);
                Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
                org.junit.jupiter.api.Assertions.assertTrue(firstInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
                org.junit.jupiter.api.Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                org.junit.jupiter.api.Assertions.assertFalse(getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
                org.junit.jupiter.api.Assertions.assertTrue(getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("002");
                this.metaClient.reloadActiveTimeline();
                List generateUpdates = this.dataGen.generateUpdates("002", generateInserts);
                List collect = hoodieWriteClient.upsert(jsc().parallelize(generateUpdates, 1), "002").collect();
                Assertions.assertNoWriteErrors(collect);
                String partitionPath = ((HoodieRecord) generateUpdates.get(0)).getPartitionPath();
                String fileId = ((WriteStatus) collect.get(0)).getFileId();
                hoodieWriteClient.startCommitWithTime("004");
                this.metaClient.reloadActiveTimeline();
                List generateDeletesFromExistingRecords = this.dataGen.generateDeletesFromExistingRecords(generateUpdates);
                JavaRDD parallelize = jsc().parallelize(generateDeletesFromExistingRecords, 1);
                create.getHoodieView().sync();
                SparkDeleteDeltaCommitActionExecutor sparkDeleteDeltaCommitActionExecutor = new SparkDeleteDeltaCommitActionExecutor(m43context(), config, create, "004", HoodieJavaRDD.of(parallelize));
                sparkDeleteDeltaCommitActionExecutor.getUpsertPartitioner(new WorkloadProfile(HoodieClientTestHarness.buildProfile(parallelize)));
                WriteStatus writeStatus = (WriteStatus) ((List) jsc().parallelize(Arrays.asList(1)).map(num -> {
                    return sparkDeleteDeltaCommitActionExecutor.handleUpdate(partitionPath, fileId, generateDeletesFromExistingRecords.iterator());
                }).map(Transformations::flatten).collect().get(0)).get(0);
                org.junit.jupiter.api.Assertions.assertTrue(writeStatus.hasErrors());
                org.junit.jupiter.api.Assertions.assertEquals(generateDeletesFromExistingRecords.size() - generateDeletesFromExistingRecords.stream().filter(hoodieRecord -> {
                    return hoodieRecord.getPartitionPath().equals(partitionPath);
                }).count(), writeStatus.getTotalErrorRecords());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testReleaseResource() throws Exception {
        Throwable th;
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        configBuilder.withReleaseResourceEnabled(true);
        configBuilder.withAutoCommit(false);
        setUp(configBuilder.build().getProps());
        SparkRDDWriteClient hoodieWriteClient = m44getHoodieWriteClient(configBuilder.build());
        Throwable th2 = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                JavaRDD parallelize = jsc().parallelize(this.dataGen.generateInserts("001", 20), 1);
                parallelize.persist(StorageLevel.MEMORY_AND_DISK());
                List collect = hoodieWriteClient.upsert(parallelize, "001").collect();
                Assertions.assertNoWriteErrors(collect);
                hoodieWriteClient.commitStats("001", (List) collect.stream().map((v0) -> {
                    return v0.getStat();
                }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
                org.junit.jupiter.api.Assertions.assertEquals(spark().sparkContext().persistentRdds().size(), 0);
                if (hoodieWriteClient != null) {
                    if (0 != 0) {
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        hoodieWriteClient.close();
                    }
                }
                configBuilder.withReleaseResourceEnabled(false);
                hoodieWriteClient = m44getHoodieWriteClient(configBuilder.build());
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    hoodieWriteClient.startCommitWithTime("002");
                    JavaRDD parallelize2 = jsc().parallelize(this.dataGen.generateInserts("002", 20), 1);
                    parallelize2.persist(StorageLevel.MEMORY_AND_DISK());
                    List collect2 = hoodieWriteClient.upsert(parallelize2, "002").collect();
                    Assertions.assertNoWriteErrors(collect2);
                    hoodieWriteClient.commitStats("002", (List) collect2.stream().map((v0) -> {
                        return v0.getStat();
                    }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
                    org.junit.jupiter.api.Assertions.assertTrue(spark().sparkContext().persistentRdds().size() > 0);
                    if (hoodieWriteClient != null) {
                        if (0 == 0) {
                            hoodieWriteClient.close();
                            return;
                        }
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -778804732:
                if (implMethodName.equals("flatten")) {
                    z = false;
                    break;
                }
                break;
            case -338322447:
                if (implMethodName.equals("lambda$testHandleUpdateWithMultiplePartitions$7cc9a839$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/List;")) {
                    return Transformations::flatten;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/TestHoodieMergeOnReadTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkDeltaCommitActionExecutor baseSparkDeltaCommitActionExecutor = (BaseSparkDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    String str2 = (String) serializedLambda.getCapturedArg(2);
                    List list = (List) serializedLambda.getCapturedArg(3);
                    return num -> {
                        return baseSparkDeltaCommitActionExecutor.handleUpdate(str, str2, list.iterator());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
