package org.apache.hudi.table.functional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.class */
public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunctionalTestHarness {
    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testCOWToMORConvertedTableRollback(boolean z) throws Exception {
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
        HoodieWriteConfig config = getConfig(false, Boolean.valueOf(z));
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = hoodieTestDataGenerator.generateInserts("001", 200);
                List collect = hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect();
                Assertions.assertNoWriteErrors(collect);
                hoodieWriteClient.commit("001", jsc().parallelize(collect));
                Option firstInstant = HoodieTableMetaClient.reload(hoodieMetaClient).getActiveTimeline().getCommitTimeline().firstInstant();
                org.junit.jupiter.api.Assertions.assertTrue(firstInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "commit should be 001");
                String str = "002";
                hoodieWriteClient.startCommitWithTime("002");
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(hoodieTestDataGenerator.generateUpdates("002", generateInserts), 1), "002").collect());
                HoodieTableMetaClient hoodieMetaClient2 = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
                hoodieWriteClient.rollback("002");
                HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient2);
                HoodieSparkTable create = HoodieSparkTable.create(config, m54context(), reload);
                org.junit.jupiter.api.Assertions.assertAll(getHoodieTableFileSystemView(reload, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create)).getLatestBaseFiles().map(hoodieBaseFile -> {
                    return () -> {
                        org.junit.jupiter.api.Assertions.assertNotEquals(str, hoodieBaseFile.getCommitTime());
                    };
                }));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testRollbackWithDeltaAndCompactionCommit(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder((Boolean) false, Boolean.valueOf(z), HoodieIndex.IndexType.SIMPLE);
        addConfigsForPopulateMetaFields(configBuilder, true);
        HoodieWriteConfig build = configBuilder.build();
        Properties copy = CollectionUtils.copy(build.getProps());
        copy.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, copy);
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            hoodieWriteClient.startCommitWithTime("000000001");
            List generateInserts = hoodieTestDataGenerator.generateInserts("000000001", 200);
            List collect = hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "000000001").collect();
            Assertions.assertNoWriteErrors(collect);
            hoodieWriteClient.commit("000000001", jsc().parallelize(collect));
            HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), hoodieMetaClient);
            Option firstInstant = hoodieMetaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue(firstInstant.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals("000000001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 000000001");
            org.junit.jupiter.api.Assertions.assertFalse(hoodieMetaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
            org.junit.jupiter.api.Assertions.assertFalse(getHoodieTableFileSystemView(hoodieMetaClient, hoodieMetaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
            HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(hoodieMetaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
            org.junit.jupiter.api.Assertions.assertTrue(hoodieTableFileSystemView.getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
            SparkRDDWriteClient hoodieWriteClient2 = m55getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));
            Throwable th2 = null;
            try {
                try {
                    hoodieWriteClient2.startCommitWithTime("000000002");
                    List generateUpdates = hoodieTestDataGenerator.generateUpdates("000000002", new ArrayList(generateInserts));
                    generateUpdates.addAll(hoodieTestDataGenerator.generateInserts("000000002", 200));
                    org.junit.jupiter.api.Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile -> {
                        return new Path(hoodieBaseFile.getPath()).getParent().toString();
                    }).collect(Collectors.toList()), basePath()).size());
                    Assertions.assertNoWriteErrors(hoodieWriteClient2.upsert(jsc().parallelize(generateUpdates, 1), "000000002").collect());
                    hoodieWriteClient2.rollback("000000002");
                    List list = (List) Arrays.stream(listAllBaseFilesInPath(create)).filter(fileStatus -> {
                        return fileStatus.getPath().getName().contains("_000000002");
                    }).map(fileStatus2 -> {
                        return fileStatus2.getPath().toString();
                    }).collect(Collectors.toList());
                    org.junit.jupiter.api.Assertions.assertEquals(0, list.size(), "These files should have been rolled-back when rolling back commit 000000002 but are still remaining. Files: " + list);
                    org.junit.jupiter.api.Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile2 -> {
                        return new Path(hoodieBaseFile2.getPath()).getParent().toString();
                    }).collect(Collectors.toList()), basePath()).size());
                    if (hoodieWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                hoodieWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            hoodieWriteClient2.close();
                        }
                    }
                    SparkRDDWriteClient hoodieWriteClient3 = m55getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));
                    Throwable th4 = null;
                    try {
                        hoodieWriteClient3.startCommitWithTime("000000003");
                        List generateUpdates2 = hoodieTestDataGenerator.generateUpdates("000000003", new ArrayList(generateInserts));
                        generateUpdates2.addAll(hoodieTestDataGenerator.generateInserts("000000003", 200));
                        org.junit.jupiter.api.Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile3 -> {
                            return new Path(hoodieBaseFile3.getPath()).getParent().toString();
                        }).collect(Collectors.toList()), basePath()).size());
                        JavaRDD parallelize = jsc().parallelize(generateUpdates2, 1);
                        Assertions.assertNoWriteErrors(hoodieWriteClient3.upsert(parallelize, "000000003").collect());
                        hoodieWriteClient3.rollback("000000003");
                        FileStatus[] listAllBaseFilesInPath2 = listAllBaseFilesInPath(create);
                        List list2 = (List) Arrays.stream(listAllBaseFilesInPath2).filter(fileStatus3 -> {
                            return fileStatus3.getPath().getName().contains("_000000003");
                        }).map(fileStatus4 -> {
                            return fileStatus4.getPath().toString();
                        }).collect(Collectors.toList());
                        org.junit.jupiter.api.Assertions.assertEquals(0, list2.size(), "These files should have been rolled-back when rolling back commit 000000003 but are still remaining. Files: " + list2);
                        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                        HoodieSparkTable create2 = HoodieSparkTable.create(build, m54context(), reload);
                        org.junit.jupiter.api.Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) getHoodieTableFileSystemView(reload, create2.getCompletedCommitsTimeline(), listAllBaseFilesInPath2).getLatestBaseFiles().map(hoodieBaseFile4 -> {
                            return new Path(hoodieBaseFile4.getPath()).getParent().toString();
                        }).collect(Collectors.toList()), basePath()).size());
                        hoodieWriteClient3.startCommitWithTime("000000004");
                        List collect2 = hoodieWriteClient3.upsert(parallelize, "000000004").collect();
                        Assertions.assertNoWriteErrors(collect2);
                        hoodieWriteClient3.commit("000000004", jsc().parallelize(collect2));
                        HoodieTableMetaClient reload2 = HoodieTableMetaClient.reload(reload);
                        hoodieWriteClient3.compact(hoodieWriteClient3.scheduleCompaction(Option.empty()).get().toString());
                        HoodieTableMetaClient reload3 = HoodieTableMetaClient.reload(reload2);
                        String timestamp = ((HoodieInstant) reload3.getActiveTimeline().reload().lastInstant().get()).getTimestamp();
                        org.junit.jupiter.api.Assertions.assertTrue(Arrays.stream(listAllBaseFilesInPath(create2)).anyMatch(fileStatus5 -> {
                            return timestamp.equals(new HoodieBaseFile(fileStatus5).getCommitTime());
                        }));
                        create2.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", timestamp));
                        FileStatus[] listAllBaseFilesInPath3 = listAllBaseFilesInPath(create2);
                        HoodieTableMetaClient reload4 = HoodieTableMetaClient.reload(reload3);
                        HoodieTableFileSystemView hoodieTableFileSystemView2 = getHoodieTableFileSystemView(reload4, reload4.getCommitsTimeline(), listAllBaseFilesInPath3);
                        org.junit.jupiter.api.Assertions.assertFalse(hoodieTableFileSystemView2.getLatestBaseFiles().anyMatch(hoodieBaseFile5 -> {
                            return timestamp.equals(hoodieBaseFile5.getCommitTime());
                        }));
                        org.junit.jupiter.api.Assertions.assertAll(hoodieTableFileSystemView2.getLatestBaseFiles().map(hoodieBaseFile6 -> {
                            return () -> {
                                org.junit.jupiter.api.Assertions.assertNotEquals(timestamp, hoodieBaseFile6.getCommitTime());
                            };
                        }));
                        if (hoodieWriteClient3 != null) {
                            if (0 != 0) {
                                try {
                                    hoodieWriteClient3.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            } else {
                                hoodieWriteClient3.close();
                            }
                        }
                        if (hoodieWriteClient != null) {
                            if (0 == 0) {
                                hoodieWriteClient.close();
                                return;
                            }
                            try {
                                hoodieWriteClient.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        if (hoodieWriteClient3 != null) {
                            if (0 != 0) {
                                try {
                                    hoodieWriteClient3.close();
                                } catch (Throwable th8) {
                                    th4.addSuppressed(th8);
                                }
                            } else {
                                hoodieWriteClient3.close();
                            }
                        }
                        throw th7;
                    }
                } catch (Throwable th9) {
                    th2 = th9;
                    throw th9;
                }
            } catch (Throwable th10) {
                if (hoodieWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            hoodieWriteClient2.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        hoodieWriteClient2.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th12;
        }
    }

    @Test
    void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
        HoodieWriteConfig.Builder withMarkersType = getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name());
        addConfigsForPopulateMetaFields(withMarkersType, true);
        HoodieWriteConfig build = withMarkersType.build();
        Properties propertiesForKeyGen = getPropertiesForKeyGen(true);
        propertiesForKeyGen.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, propertiesForKeyGen);
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = hoodieTestDataGenerator.generateInserts("001", 200);
            List collect = hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect();
            Assertions.assertNoWriteErrors(collect);
            hoodieWriteClient.commit("001", jsc().parallelize(collect));
            hoodieWriteClient.close();
            Option lastCommitMetadataWithValidData = hoodieMetaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
            org.junit.jupiter.api.Assertions.assertTrue(lastCommitMetadataWithValidData.isPresent());
            HoodieInstant hoodieInstant = (HoodieInstant) ((Pair) lastCommitMetadataWithValidData.get()).getKey();
            org.junit.jupiter.api.Assertions.assertEquals("001", hoodieInstant.getTimestamp());
            org.junit.jupiter.api.Assertions.assertEquals("deltacommit", hoodieInstant.getAction());
            org.junit.jupiter.api.Assertions.assertEquals(200L, getTotalRecordsWritten((HoodieCommitMetadata) ((Pair) lastCommitMetadataWithValidData.get()).getValue()));
            org.junit.jupiter.api.Assertions.assertFalse(hoodieMetaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), hoodieMetaClient);
            FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
            org.junit.jupiter.api.Assertions.assertFalse(getHoodieTableFileSystemView(hoodieMetaClient, hoodieMetaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
            HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(hoodieMetaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
            org.junit.jupiter.api.Assertions.assertTrue(hoodieTableFileSystemView.getLatestBaseFiles().findAny().isPresent(), "Should list the base files we wrote in the delta commit");
            SparkRDDWriteClient hoodieWriteClient2 = m55getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOffBuilder(true).withMarkersType(MarkerType.DIRECT.name()).build());
            Throwable th2 = null;
            try {
                try {
                    hoodieWriteClient2.startCommitWithTime("002");
                    List generateUpdates = hoodieTestDataGenerator.generateUpdates("002", new ArrayList(generateInserts));
                    generateUpdates.addAll(hoodieTestDataGenerator.generateInserts("002", 200));
                    org.junit.jupiter.api.Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile -> {
                        return new Path(hoodieBaseFile.getPath()).getParent().toString();
                    }).collect(Collectors.toList()), basePath()).size());
                    List collect2 = hoodieWriteClient2.upsert(jsc().parallelize(generateUpdates, 1), "002").collect();
                    Assertions.assertNoWriteErrors(collect2);
                    hoodieWriteClient2.commit("002", jsc().parallelize(collect2));
                    generateUpdates.clear();
                    if (hoodieWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                hoodieWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            hoodieWriteClient2.close();
                        }
                    }
                    hoodieWriteClient.startCommitWithTime("003");
                    List generateInserts2 = hoodieTestDataGenerator.generateInserts("003", 100);
                    List generateUpdates2 = hoodieTestDataGenerator.generateUpdates("003", generateInserts);
                    generateUpdates2.addAll(generateInserts2);
                    List collect3 = hoodieWriteClient.upsert(jsc().parallelize(generateUpdates2, 1), "003").collect();
                    Assertions.assertNoWriteErrors(collect3);
                    hoodieWriteClient.commit("003", jsc().parallelize(collect3));
                    HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                    hoodieWriteClient.scheduleCompactionAtInstant("004", Option.empty());
                    hoodieWriteClient.startCommitWithTime("005");
                    List generateUpdates3 = hoodieTestDataGenerator.generateUpdates("005", generateUpdates2);
                    List collect4 = hoodieWriteClient.upsert(jsc().parallelize(generateUpdates3, 1), "005").collect();
                    Assertions.assertNoWriteErrors(collect4);
                    hoodieWriteClient.commit("005", jsc().parallelize(collect4));
                    HoodieTableMetaClient reload2 = HoodieTableMetaClient.reload(reload);
                    hoodieWriteClient.scheduleCompactionAtInstant("006", Option.empty());
                    hoodieWriteClient.commitCompaction("006", (HoodieCommitMetadata) hoodieWriteClient.compact("006").getCommitMetadata().get(), Option.empty());
                    FileStatus[] listAllBaseFilesInPath2 = listAllBaseFilesInPath(create);
                    HoodieTableMetaClient reload3 = HoodieTableMetaClient.reload(reload2);
                    HoodieTableFileSystemView hoodieTableFileSystemView2 = getHoodieTableFileSystemView(reload3, reload3.getCommitsTimeline(), listAllBaseFilesInPath2);
                    String timestamp = ((HoodieInstant) reload3.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get()).getTimestamp();
                    org.junit.jupiter.api.Assertions.assertTrue(hoodieTableFileSystemView2.getLatestBaseFiles().anyMatch(hoodieBaseFile2 -> {
                        return timestamp.equals(hoodieBaseFile2.getCommitTime());
                    }));
                    hoodieWriteClient.startCommitWithTime("007");
                    List generateUpdates4 = hoodieTestDataGenerator.generateUpdates("007", new ArrayList(generateUpdates3));
                    generateUpdates4.addAll(hoodieTestDataGenerator.generateInserts("007", 200));
                    List collect5 = hoodieWriteClient.upsert(jsc().parallelize(generateUpdates4, 1), "007").collect();
                    Assertions.assertNoWriteErrors(collect5);
                    hoodieWriteClient.commit("007", jsc().parallelize(collect5));
                    generateUpdates4.clear();
                    hoodieWriteClient.restoreToInstant("003", build.isMetadataTableEnabled());
                    reload3.reloadActiveTimeline();
                    FileStatus[] listAllBaseFilesInPath3 = listAllBaseFilesInPath(create);
                    org.junit.jupiter.api.Assertions.assertFalse(getHoodieTableFileSystemView(reload3, reload3.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(), listAllBaseFilesInPath3).getLatestBaseFiles().filter(hoodieBaseFile3 -> {
                        return hoodieBaseFile3.getCommitTime().compareTo("003") > 0;
                    }).findAny().isPresent());
                    List list = (List) getHoodieTableFileSystemView(reload3, reload3.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath3).getAllFileGroups().collect(Collectors.toList());
                    org.junit.jupiter.api.Assertions.assertFalse(list.isEmpty());
                    org.junit.jupiter.api.Assertions.assertFalse(list.stream().filter(hoodieFileGroup -> {
                        return hoodieFileGroup.getAllFileSlices().map(fileSlice -> {
                            return Boolean.valueOf(fileSlice.getBaseInstantTime().compareTo("003") > 0);
                        }).count() > 0;
                    }).findAny().isPresent());
                    if (hoodieWriteClient != null) {
                        if (0 == 0) {
                            hoodieWriteClient.close();
                            return;
                        }
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (hoodieWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            hoodieWriteClient2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        hoodieWriteClient2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th8;
        }
    }

    @Test
    void testRestoreWithCleanedUpCommits() throws Exception {
        HoodieWriteConfig.Builder withMarkersType = getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name());
        addConfigsForPopulateMetaFields(withMarkersType, true);
        HoodieWriteConfig build = withMarkersType.build();
        Properties properties = 1 != 0 ? new Properties() : getPropertiesForKeyGen();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            hoodieWriteClient.startCommitWithTime("001");
            List<HoodieRecord> generateInserts = hoodieTestDataGenerator.generateInserts("001", 200);
            List collect = hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect();
            Assertions.assertNoWriteErrors(collect);
            hoodieWriteClient.commit("001", jsc().parallelize(collect));
            upsertRecords(hoodieWriteClient, "002", generateInserts, hoodieTestDataGenerator);
            hoodieWriteClient.savepoint("002", "user1", "comment1");
            upsertRecords(hoodieWriteClient, "003", generateInserts, hoodieTestDataGenerator);
            upsertRecords(hoodieWriteClient, "004", generateInserts, hoodieTestDataGenerator);
            hoodieWriteClient.scheduleCompactionAtInstant("006", Option.empty());
            hoodieWriteClient.commitCompaction("006", (HoodieCommitMetadata) hoodieWriteClient.compact("006").getCommitMetadata().get(), Option.empty());
            upsertRecords(hoodieWriteClient, "007", generateInserts, hoodieTestDataGenerator);
            upsertRecords(hoodieWriteClient, "008", generateInserts, hoodieTestDataGenerator);
            hoodieWriteClient.scheduleCompactionAtInstant("009", Option.empty());
            hoodieWriteClient.commitCompaction("009", (HoodieCommitMetadata) hoodieWriteClient.compact("009").getCommitMetadata().get(), Option.empty());
            upsertRecords(hoodieWriteClient, "010", generateInserts, hoodieTestDataGenerator);
            HoodieWriteConfig.Builder withMarkersType2 = getConfigBuilder(false).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMarkersType(MarkerType.DIRECT.name());
            addConfigsForPopulateMetaFields(withMarkersType2, true);
            SparkRDDWriteClient hoodieWriteClient2 = m55getHoodieWriteClient(withMarkersType2.build());
            Throwable th2 = null;
            try {
                try {
                    hoodieWriteClient2.clean();
                    if (hoodieWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                hoodieWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            hoodieWriteClient2.close();
                        }
                    }
                    HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                    upsertRecords(hoodieWriteClient, "011", generateInserts, hoodieTestDataGenerator);
                    hoodieWriteClient.restoreToInstant("002", build.isMetadataTableEnabled());
                    org.junit.jupiter.api.Assertions.assertFalse(getHoodieTableFileSystemView(reload, reload.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath(HoodieSparkTable.create(build, m54context(), reload))).getLatestBaseFiles().anyMatch(hoodieBaseFile -> {
                        return HoodieTimeline.compareTimestamps("002", HoodieTimeline.GREATER_THAN, hoodieBaseFile.getCommitTime());
                    }));
                    hoodieWriteClient.deleteSavepoint("002");
                    org.junit.jupiter.api.Assertions.assertFalse(reload.reloadActiveTimeline().getSavePointTimeline().containsInstant("002"));
                    if (hoodieWriteClient != null) {
                        if (0 == 0) {
                            hoodieWriteClient.close();
                            return;
                        }
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (hoodieWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            hoodieWriteClient2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        hoodieWriteClient2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th8;
        }
    }

    private void upsertRecords(SparkRDDWriteClient sparkRDDWriteClient, String str, List<HoodieRecord> list, HoodieTestDataGenerator hoodieTestDataGenerator) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List collect = sparkRDDWriteClient.upsert(jsc().parallelize(hoodieTestDataGenerator.generateUpdates(str, new ArrayList(list)), 1), str).collect();
        Assertions.assertNoWriteErrors(collect);
        sparkRDDWriteClient.commit(str, jsc().parallelize(collect));
    }

    private long getTotalRecordsWritten(HoodieCommitMetadata hoodieCommitMetadata) {
        return ((Long) hoodieCommitMetadata.getPartitionToWriteStats().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map(hoodieWriteStat -> {
            return Long.valueOf(hoodieWriteStat.getNumWrites() + hoodieWriteStat.getNumUpdateWrites());
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testMORTableRestore(boolean z) throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name()).build();
        Properties propertiesForKeyGen = getPropertiesForKeyGen(true);
        propertiesForKeyGen.putAll(build.getProps());
        propertiesForKeyGen.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, propertiesForKeyGen);
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            List<HoodieRecord> insertAndGetRecords = insertAndGetRecords("001", hoodieWriteClient, hoodieTestDataGenerator, 200);
            List<HoodieRecord> updateAndGetRecords = updateAndGetRecords("002", hoodieWriteClient, hoodieTestDataGenerator, insertAndGetRecords);
            List<HoodieRecord> updateAndGetRecords2 = updateAndGetRecords("003", hoodieWriteClient, hoodieTestDataGenerator, insertAndGetRecords);
            List<HoodieRecord> updateAndGetRecords3 = updateAndGetRecords("004", hoodieWriteClient, hoodieTestDataGenerator, insertAndGetRecords);
            validateRecords(build, hoodieMetaClient, updateAndGetRecords3);
            if (z) {
                HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                hoodieWriteClient.scheduleCompactionAtInstant("005", Option.empty());
                hoodieWriteClient.commitCompaction("005", (HoodieCommitMetadata) hoodieWriteClient.compact("005").getCommitMetadata().get(), Option.empty());
                validateRecords(build, reload, updateAndGetRecords3);
                updateAndGetRecords("006", hoodieWriteClient, hoodieTestDataGenerator, insertAndGetRecords);
                validateRecords(build, reload, updateAndGetRecords("007", hoodieWriteClient, hoodieTestDataGenerator, insertAndGetRecords));
                hoodieWriteClient.restoreToInstant("003", build.isMetadataTableEnabled());
                validateRecords(build, reload, updateAndGetRecords2);
            } else {
                hoodieWriteClient.restoreToInstant("002", build.isMetadataTableEnabled());
                validateRecords(build, hoodieMetaClient, updateAndGetRecords);
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private List<HoodieRecord> insertAndGetRecords(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieTestDataGenerator hoodieTestDataGenerator, int i) {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateInserts = hoodieTestDataGenerator.generateInserts(str, Integer.valueOf(i));
        sparkRDDWriteClient.commit(str, sparkRDDWriteClient.upsert(jsc().parallelize(generateInserts, 1), str));
        return generateInserts;
    }

    private List<HoodieRecord> updateAndGetRecords(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieTestDataGenerator hoodieTestDataGenerator, List<HoodieRecord> list) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateUpdates = hoodieTestDataGenerator.generateUpdates(str, list);
        sparkRDDWriteClient.commit(str, sparkRDDWriteClient.upsert(jsc().parallelize(generateUpdates, 1), str));
        return generateUpdates;
    }

    private void validateRecords(HoodieWriteConfig hoodieWriteConfig, HoodieTableMetaClient hoodieTableMetaClient, List<HoodieRecord> list) throws IOException {
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m54context(), hoodieTableMetaClient);
        assertRecords(list, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) getHoodieTableFileSystemView(hoodieTableMetaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create)).getLatestBaseFiles().map(hoodieBaseFile -> {
            return new Path(hoodieBaseFile.getPath()).getParent().toString();
        }).collect(Collectors.toList()), basePath()));
    }

    private void assertRecords(List<HoodieRecord> list, List<GenericRecord> list2) {
        org.junit.jupiter.api.Assertions.assertEquals(list2.size(), list.size());
        HashMap hashMap = new HashMap();
        list.forEach(hoodieRecord -> {
            try {
                hashMap.put(hoodieRecord.getRecordKey(), (GenericRecord) ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        HashMap hashMap2 = new HashMap();
        list2.forEach(genericRecord -> {
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            org.junit.jupiter.api.Assertions.assertEquals(String.valueOf(((GenericRecord) entry.getValue()).get("driver")), String.valueOf(((GenericRecord) hashMap2.get(entry.getKey())).get("driver")));
        }
    }

    private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean z) {
        return getHoodieWriteConfigWithSmallFileHandlingOffBuilder(z).build();
    }

    private HoodieWriteConfig.Builder getHoodieWriteConfigWithSmallFileHandlingOffBuilder(boolean z) {
        HoodieWriteConfig.Builder forTable = HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withEmbeddedTimelineServerEnabled(true).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024L).parquetMaxFileSize(1024L).build()).forTable("test-trip-table");
        if (!z) {
            addConfigsForPopulateMetaFields(forTable, false);
        }
        return forTable;
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testInsertsGeneratedIntoLogFilesRollback(boolean z) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, Boolean.valueOf(z), HoodieIndex.IndexType.INMEMORY).build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("100");
            List collect = hoodieWriteClient.insert(jsc().parallelize(hoodieTestDataGenerator.generateInserts("100", 100), 1), "100").collect();
            org.junit.jupiter.api.Assertions.assertEquals(0L, collect.stream().filter(writeStatus -> {
                return !writeStatus.getStat().getPath().contains("log");
            }).count());
            org.junit.jupiter.api.Assertions.assertTrue(collect.stream().anyMatch(writeStatus2 -> {
                return writeStatus2.getStat().getPath().contains("log");
            }));
            org.junit.jupiter.api.Assertions.assertTrue(hoodieWriteClient.rollback("100"));
            hoodieWriteClient.startCommitWithTime("101");
            hoodieWriteClient.insert(jsc().parallelize(hoodieTestDataGenerator.generateInserts("101", 100), 1), "101").collect();
            Thread.sleep(1000L);
            java.nio.file.Path createTempDirectory = Files.createTempDirectory(getClass().getCanonicalName(), new FileAttribute[0]);
            HashMap hashMap = new HashMap();
            Iterator it = Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT).iterator();
            while (it.hasNext()) {
                HoodieInstant hoodieInstant = new HoodieInstant((HoodieInstant.State) it.next(), "deltacommit", "101");
                File file = Files.createTempFile(createTempDirectory, null, null, new FileAttribute[0]).toFile();
                hoodieMetaClient.getFs().copyToLocalFile(new Path(hoodieMetaClient.getMetaPath(), hoodieInstant.getFileName()), new Path(file.getAbsolutePath()));
                hashMap.put(file.getAbsolutePath(), hoodieInstant.getFileName());
            }
            Path path = new Path(Files.createTempDirectory(createTempDirectory, null, new FileAttribute[0]).toAbsolutePath().toString());
            if (z) {
                hoodieMetaClient.getFs().copyToLocalFile(new Path(hoodieMetaClient.getMarkerFolderPath("101")), path);
            }
            hoodieWriteClient.rollback("101");
            HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
            TableFileSystemView.SliceView sliceView = HoodieSparkTable.create(build, m54context()).getSliceView();
            long j = 0;
            for (String str : hoodieTestDataGenerator.getPartitionPaths()) {
                org.junit.jupiter.api.Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice -> {
                    return fileSlice.getBaseFile().isPresent();
                }));
                org.junit.jupiter.api.Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice2 -> {
                    return fileSlice2.getLogFiles().count() > 0;
                }));
                j += sliceView.getLatestFileSlices(str).filter(fileSlice3 -> {
                    return fileSlice3.getLogFiles().count() > 0;
                }).count();
            }
            org.junit.jupiter.api.Assertions.assertEquals(0L, j);
            for (Map.Entry entry : hashMap.entrySet()) {
                try {
                    reload.getFs().copyFromLocalFile(new Path((String) entry.getKey()), new Path(reload.getMetaPath(), (String) entry.getValue()));
                } catch (IOException e) {
                    throw new HoodieIOException("Error copying state from local disk.", e);
                }
            }
            if (z) {
                reload.getFs().copyFromLocalFile(new Path(path, "101"), new Path(reload.getMarkerFolderPath("101")));
            }
            Thread.sleep(1000L);
            hoodieWriteClient.rollback("101");
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(boolean z) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, Boolean.valueOf(z), HoodieIndex.IndexType.INMEMORY).build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                hoodieWriteClient.commit("100", hoodieWriteClient.insert(jsc().parallelize(hoodieTestDataGenerator.generateInserts("100", 100), 1), "100"));
                HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), reload);
                create.getHoodieView().sync();
                TableFileSystemView.SliceView sliceView = create.getSliceView();
                long j = 0;
                for (String str : hoodieTestDataGenerator.getPartitionPaths()) {
                    org.junit.jupiter.api.Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice -> {
                        return fileSlice.getBaseFile().isPresent();
                    }));
                    org.junit.jupiter.api.Assertions.assertTrue(sliceView.getLatestFileSlices(str).anyMatch(fileSlice2 -> {
                        return fileSlice2.getLogFiles().count() > 0;
                    }));
                    j += sliceView.getLatestFileSlices(str).filter(fileSlice3 -> {
                        return fileSlice3.getLogFiles().count() > 0;
                    }).count();
                }
                org.junit.jupiter.api.Assertions.assertTrue(j > 0);
                String obj = hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString();
                HoodieWriteMetadata compact = hoodieWriteClient.compact(obj);
                String baseFileExtension = create.getBaseFileExtension();
                Collection values = ((HoodieCommitMetadata) compact.getCommitMetadata().get()).getPartitionToWriteStats().values();
                org.junit.jupiter.api.Assertions.assertEquals(j, values.stream().flatMap((v0) -> {
                    return v0.stream();
                }).filter(hoodieWriteStat -> {
                    return hoodieWriteStat.getPath().contains(baseFileExtension);
                }).count());
                org.junit.jupiter.api.Assertions.assertEquals(j, values.stream().mapToLong((v0) -> {
                    return v0.size();
                }).sum());
                create.getActiveTimeline().reload();
                create.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", obj));
                HoodieTableMetaClient reload2 = HoodieTableMetaClient.reload(reload);
                HoodieSparkTable.create(build, m54context(), reload2).getSliceView().reset();
                for (String str2 : hoodieTestDataGenerator.getPartitionPaths()) {
                    List list = (List) getFileSystemViewWithUnCommittedSlices(reload2).getAllFileSlices(str2).filter(fileSlice4 -> {
                        return fileSlice4.getBaseInstantTime().equals("100");
                    }).collect(Collectors.toList());
                    org.junit.jupiter.api.Assertions.assertTrue(list.stream().noneMatch(fileSlice5 -> {
                        return fileSlice5.getBaseFile().isPresent();
                    }));
                    org.junit.jupiter.api.Assertions.assertTrue(list.stream().anyMatch(fileSlice6 -> {
                        return fileSlice6.getLogFiles().count() > 0;
                    }));
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLazyRollbackOfFailedCommit(boolean z) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig writeConfig = getWriteConfig(true, z);
        HoodieWriteConfig writeConfig2 = getWriteConfig(false, z);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            SparkRDDWriteClient hoodieWriteClient2 = m55getHoodieWriteClient(writeConfig2);
            Throwable th2 = null;
            try {
                try {
                    List<HoodieRecord> insertRecords = insertRecords(hoodieWriteClient, hoodieTestDataGenerator, "001");
                    updateRecords(hoodieWriteClient, hoodieTestDataGenerator, "002", insertRecords, hoodieMetaClient, writeConfig, true);
                    updateRecords(hoodieWriteClient2, hoodieTestDataGenerator, "003", insertRecords, hoodieMetaClient, writeConfig2, false);
                    updateRecords(hoodieWriteClient, hoodieTestDataGenerator, "004", insertRecords, hoodieMetaClient, writeConfig, false);
                    long numLogFilesInLatestFileSlice = getNumLogFilesInLatestFileSlice(hoodieMetaClient, writeConfig, hoodieTestDataGenerator);
                    doCompaction(hoodieWriteClient2, hoodieMetaClient, writeConfig, numLogFilesInLatestFileSlice);
                    long numLogFilesInLatestFileSlice2 = getNumLogFilesInLatestFileSlice(hoodieMetaClient, writeConfig, hoodieTestDataGenerator);
                    org.junit.jupiter.api.Assertions.assertNotEquals(numLogFilesInLatestFileSlice, numLogFilesInLatestFileSlice2);
                    hoodieWriteClient.rollback("003");
                    org.junit.jupiter.api.Assertions.assertEquals(getNumLogFilesInLatestFileSlice(hoodieMetaClient, writeConfig, hoodieTestDataGenerator), numLogFilesInLatestFileSlice2);
                    if (hoodieWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                hoodieWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            hoodieWriteClient2.close();
                        }
                    }
                    if (hoodieWriteClient != null) {
                        if (0 == 0) {
                            hoodieWriteClient.close();
                            return;
                        }
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (hoodieWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            hoodieWriteClient2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        hoodieWriteClient2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th8;
        }
    }

    private List<HoodieRecord> insertRecords(SparkRDDWriteClient sparkRDDWriteClient, HoodieTestDataGenerator hoodieTestDataGenerator, String str) {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateInserts = hoodieTestDataGenerator.generateInserts(str, 20);
        Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc().parallelize(generateInserts, 1), str).collect());
        return generateInserts;
    }

    private List<HoodieRecord> updateRecords(SparkRDDWriteClient sparkRDDWriteClient, HoodieTestDataGenerator hoodieTestDataGenerator, String str, List<HoodieRecord> list, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, boolean z) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateUpdates = hoodieTestDataGenerator.generateUpdates(str, list);
        Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc().parallelize(generateUpdates, 1), str).collect());
        if (z) {
            HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m54context(), hoodieTableMetaClient);
            create.getHoodieView().sync();
            TableFileSystemView.SliceView sliceView = create.getSliceView();
            long j = 0;
            for (String str2 : hoodieTestDataGenerator.getPartitionPaths()) {
                List list2 = (List) sliceView.getLatestFileSlices(str2).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertEquals(1L, list2.stream().filter(fileSlice -> {
                    return fileSlice.getBaseFile().isPresent();
                }).count());
                org.junit.jupiter.api.Assertions.assertTrue(list2.stream().anyMatch(fileSlice2 -> {
                    return fileSlice2.getLogFiles().count() > 0;
                }));
                j += list2.stream().filter(fileSlice3 -> {
                    return fileSlice3.getLogFiles().count() > 0;
                }).count();
            }
            org.junit.jupiter.api.Assertions.assertTrue(j > 0);
        }
        return generateUpdates;
    }

    private long doCompaction(SparkRDDWriteClient sparkRDDWriteClient, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, long j) throws IOException {
        String obj = sparkRDDWriteClient.scheduleCompaction(Option.empty()).get().toString();
        HoodieWriteMetadata compact = sparkRDDWriteClient.compact(obj);
        hoodieTableMetaClient.reloadActiveTimeline();
        String baseFileExtension = HoodieSparkTable.create(hoodieWriteConfig, m54context(), hoodieTableMetaClient).getBaseFileExtension();
        Collection values = ((HoodieCommitMetadata) compact.getCommitMetadata().get()).getPartitionToWriteStats().values();
        org.junit.jupiter.api.Assertions.assertEquals(j, values.stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(hoodieWriteStat -> {
            return hoodieWriteStat.getPath().contains(baseFileExtension);
        }).count());
        org.junit.jupiter.api.Assertions.assertEquals(j, values.stream().mapToLong((v0) -> {
            return v0.size();
        }).sum());
        sparkRDDWriteClient.commitCompaction(obj, (HoodieCommitMetadata) compact.getCommitMetadata().get(), Option.empty());
        return j;
    }

    private long getNumLogFilesInLatestFileSlice(HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, HoodieTestDataGenerator hoodieTestDataGenerator) {
        hoodieTableMetaClient.reloadActiveTimeline();
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m54context(), hoodieTableMetaClient);
        create.getHoodieView().sync();
        TableFileSystemView.SliceView sliceView = create.getSliceView();
        long j = 0;
        for (String str : hoodieTestDataGenerator.getPartitionPaths()) {
            j += ((List) sliceView.getLatestFileSlices(str).collect(Collectors.toList())).stream().filter(fileSlice -> {
                return fileSlice.getLogFiles().count() > 0;
            }).count();
        }
        return j;
    }

    private HoodieWriteConfig getWriteConfig(boolean z, boolean z2) {
        return getConfigBuilder(Boolean.valueOf(z)).withRollbackUsingMarkers(z2).withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1073741824L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(3).build()).build();
    }

    private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient hoodieTableMetaClient) {
        try {
            return new HoodieTableFileSystemView(hoodieTableMetaClient, hoodieTableMetaClient.getActiveTimeline(), HoodieTestTable.of(hoodieTableMetaClient).listAllBaseAndLogFiles());
        } catch (IOException e) {
            throw new HoodieIOException("Error getting file system view", e);
        }
    }
}
