package org.apache.hudi.table;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/table/TestHoodieMergeOnReadTable.class */
public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
    private JobConf roSnapshotJobConf;
    private JobConf roJobConf;
    private JobConf rtJobConf;

    @TempDir
    public Path tempFolder;
    private HoodieFileFormat baseFileFormat;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.table.TestHoodieMergeOnReadTable$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/table/TestHoodieMergeOnReadTable$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat = new int[HoodieFileFormat.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[HoodieFileFormat.PARQUET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[HoodieFileFormat.HFILE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public void init(HoodieFileFormat hoodieFileFormat, boolean z) throws IOException {
        this.baseFileFormat = hoodieFileFormat;
        initDFS();
        initSparkContexts("TestHoodieMergeOnReadTable");
        this.hadoopConf.addResource(this.dfs.getConf());
        this.jsc.hadoopConfiguration().addResource(this.dfs.getConf());
        this.context = new HoodieSparkEngineContext(this.jsc);
        initPath();
        this.dfs.mkdirs(new org.apache.hadoop.fs.Path(this.basePath));
        Properties properties = z ? new Properties() : getPropertiesForKeyGen();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), hoodieFileFormat.toString());
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ, properties);
        initTestDataGenerator();
        this.roSnapshotJobConf = new JobConf(this.hadoopConf);
        this.roJobConf = new JobConf(this.hadoopConf);
        this.rtJobConf = new JobConf(this.hadoopConf);
    }

    @BeforeEach
    public void init() throws IOException {
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), true);
    }

    @AfterEach
    public void clean() throws IOException {
        cleanupResources();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> populateMetaFieldsParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true}, new Boolean[]{false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true, true}, new Boolean[]{false, true}, new Boolean[]{true, false}, new Boolean[]{false, false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testSimpleInsertAndUpdate(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            insertRecords(this.dataGen.generateInserts("001", 200), hoodieWriteClient, build, "001");
            hoodieWriteClient.startCommitWithTime("004");
            updateRecords(this.dataGen.generateUpdates("004", 100), hoodieWriteClient, build, "004");
            hoodieWriteClient.compact(hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString());
            HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
            this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create));
            Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent());
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            HoodieTimeline filterCompletedInstants = this.metaClient.getCommitTimeline().filterCompletedInstants();
            Assertions.assertEquals(1, filterCompletedInstants.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
            Assertions.assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp()));
            if (build.populateMetaFields()) {
                Assertions.assertEquals(200L, HoodieClientTestUtils.countRecordsOptionallySince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, Option.of("000")), "Must contain 200 records");
            } else {
                Assertions.assertEquals(200L, HoodieClientTestUtils.countRecordsOptionallySince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, Option.empty()));
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSimpleInsertAndUpdateHFile() throws Exception {
        clean();
        init(HoodieFileFormat.HFILE, true);
        HoodieWriteConfig build = getConfigBuilder(true).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                insertRecords(this.dataGen.generateInserts("001", 200), hoodieWriteClient, build, "001");
                hoodieWriteClient.startCommitWithTime("004");
                updateRecords(this.dataGen.generateUpdates("004", 100), hoodieWriteClient, build, "004");
                hoodieWriteClient.compact(hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString());
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
                new HoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
                Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieTimeline filterCompletedInstants = this.metaClient.getCommitTimeline().filterCompletedInstants();
                Assertions.assertEquals(1, filterCompletedInstants.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
                Assertions.assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp()));
                Assertions.assertEquals(200L, HoodieClientTestUtils.countRecordsOptionallySince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, Option.of("000")), "Must contain 200 records");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"populateMetaFieldsAndPreserveMetadataParams"})
    @ParameterizedTest
    public void testSimpleClusteringNoUpdates(boolean z, boolean z2) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        testClustering(false, z, z2);
    }

    @MethodSource({"populateMetaFieldsAndPreserveMetadataParams"})
    @ParameterizedTest
    public void testSimpleClusteringWithUpdates(boolean z, boolean z2) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        testClustering(true, z, z2);
    }

    private void testClustering(boolean z, boolean z2, boolean z3) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder((Boolean) true, 10L, HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(Boolean.valueOf(z3)).build());
        addConfigsForPopulateMetaFields(configBuilder, z2);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = this.dataGen.generateInserts("001", 400);
            insertRecords(generateInserts.subList(0, 200), hoodieWriteClient, build, "001");
            hoodieWriteClient.startCommitWithTime("002");
            insertRecords(generateInserts.subList(200, 400), hoodieWriteClient, build, "002");
            if (z) {
                hoodieWriteClient.startCommitWithTime("003");
                updateRecords(this.dataGen.generateUpdates("003", 100), hoodieWriteClient, build, "003");
            }
            Assertions.assertEquals(this.dataGen.getPartitionPaths().length * 2, listAllBaseFilesInPath(HoodieSparkTable.create(build, this.context, this.metaClient)).length);
            String obj = hoodieWriteClient.scheduleClustering(Option.empty()).get().toString();
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            Assertions.assertEquals(r0.length, HoodieSparkTable.create(build, this.context, this.metaClient).getFileSystemView().getFileGroupsInPendingClustering().map((v0) -> {
                return v0.getLeft();
            }).count());
            hoodieWriteClient.cluster(obj, true);
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
            Assertions.assertEquals(this.dataGen.getPartitionPaths().length, Arrays.stream(this.dataGen.getPartitionPaths()).flatMap(str -> {
                return create.getBaseFileOnlyView().getLatestBaseFiles(str);
            }).count());
            HoodieTimeline filterCompletedInstants = this.metaClient.getCommitTimeline().filterCompletedInstants();
            Assertions.assertEquals(1, filterCompletedInstants.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
            Assertions.assertEquals(obj, ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp());
            Assertions.assertEquals("replacecommit", ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getAction());
            if (build.populateMetaFields()) {
                Assertions.assertEquals(400L, HoodieClientTestUtils.countRecordsOptionallySince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, Option.of("000")), "Must contain 200 records");
            } else {
                Assertions.assertEquals(400L, HoodieClientTestUtils.countRecordsOptionallySince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, Option.empty()));
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testIncrementalReadsWithCompaction() throws Exception {
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2020/02/20"});
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                insertRecords(this.dataGen.generateInserts("001", 200), hoodieWriteClient, config, "001");
                validateFiles("2020/02/20", 1, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 200, "001");
                FileStatus[] rOIncrementalFiles = getROIncrementalFiles("2020/02/20", true);
                validateFiles("2020/02/20", 1, rOIncrementalFiles, false, this.roJobConf, 200, "001");
                org.apache.hadoop.fs.Path path = rOIncrementalFiles[0].getPath();
                FileStatus[] rTIncrementalFiles = getRTIncrementalFiles("2020/02/20");
                validateFiles("2020/02/20", 1, rTIncrementalFiles, true, this.rtJobConf, 200, "001");
                Assertions.assertEquals(path, rTIncrementalFiles[0].getPath());
                hoodieWriteClient.startCommitWithTime("004");
                updateRecords(this.dataGen.generateUpdates("004", 100), hoodieWriteClient, config, "004");
                FileStatus[] rOIncrementalFiles2 = getROIncrementalFiles("2020/02/20", false);
                validateFiles("2020/02/20", 1, rOIncrementalFiles2, false, this.roJobConf, 200, "001");
                Assertions.assertEquals(path, rOIncrementalFiles2[0].getPath());
                validateFiles("2020/02/20", 1, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 200, "001", "004");
                hoodieWriteClient.scheduleCompactionAtInstant("005", Option.empty());
                validateFiles("2020/02/20", 1, getROIncrementalFiles("2020/02/20", true), false, this.roJobConf, 200, "001");
                validateFiles("2020/02/20", 1, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 200, "001", "004");
                List<HoodieRecord> generateInserts = this.dataGen.generateInserts("006", 200);
                hoodieWriteClient.startCommitWithTime("006");
                insertRecords(generateInserts, hoodieWriteClient, config, "006");
                validateFiles("2020/02/20", 2, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 400, "001", "006");
                FileStatus[] rOIncrementalFiles3 = getROIncrementalFiles("2020/02/20", true);
                Assertions.assertEquals(path, rOIncrementalFiles3[0].getPath());
                validateFiles("2020/02/20", 1, rOIncrementalFiles3, false, this.roJobConf, 200, "001");
                validateFiles("2020/02/20", 2, getROIncrementalFiles("2020/02/20", false), false, this.roJobConf, 400, "001", "006");
                validateFiles("2020/02/20", 2, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 400, "001", "004", "006");
                hoodieWriteClient.compact("005");
                validateFiles("2020/02/20", 2, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 400, "001", "005", "006");
                FileStatus[] rOIncrementalFiles4 = getROIncrementalFiles("2020/02/20", "002", -1, true);
                Assertions.assertTrue(rOIncrementalFiles4.length == 2);
                validateFiles("2020/02/20", 2, rOIncrementalFiles4, false, this.roJobConf, 400, "001", "005", "006");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testMetadataAggregateFromWriteStatus() throws Exception {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build());
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("001", 200);
                JavaRDD parallelize = this.jsc.parallelize(generateInserts, 1);
                hoodieWriteClient.startCommitWithTime("001");
                List collect = hoodieWriteClient.upsert(parallelize, "001").collect();
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
                Map mergeMetadataForWriteStatuses = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(collect);
                Assertions.assertTrue(mergeMetadataForWriteStatuses.containsKey("InputRecordCount_1506582000"));
                Assertions.assertEquals(String.valueOf(2 * generateInserts.size()), mergeMetadataForWriteStatuses.get("InputRecordCount_1506582000"));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testSimpleInsertUpdateAndDelete(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 20);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect());
                this.metaClient = getHoodieMetaClient(this.hadoopConf, build.getBasePath());
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
                Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
                Assertions.assertTrue(firstInstant.isPresent());
                Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
                Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath);
                Assertions.assertFalse(this.tableView.getLatestBaseFiles().findAny().isPresent());
                this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
                Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("002");
                List generateUpdates = this.dataGen.generateUpdates("002", generateInserts);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates, 1), "002").collect());
                hoodieWriteClient.startCommitWithTime("004");
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateDeletesFromExistingRecords(generateUpdates), 1), "004").collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                Option lastInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
                Assertions.assertTrue(lastInstant.isPresent());
                Assertions.assertEquals("004", ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should be 004");
                Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create));
                Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent());
                Assertions.assertEquals(0, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map((v0) -> {
                    return v0.getPath();
                }).collect(Collectors.toList()), this.basePath, new JobConf(this.hadoopConf), true, false).size(), "Must contain 0 records");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private void testCOWToMORConvertedTableRollback(Boolean bool) throws Exception {
        HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.COPY_ON_WRITE);
        HoodieWriteConfig config = getConfig(false, bool);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 200);
                List collect = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect();
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
                hoodieWriteClient.commit("001", this.jsc.parallelize(collect));
                this.metaClient = getHoodieMetaClient(this.hadoopConf, config.getBasePath());
                Option firstInstant = this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
                Assertions.assertTrue(firstInstant.isPresent());
                Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "commit should be 001");
                String str = "002";
                hoodieWriteClient.startCommitWithTime("002");
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("002", generateInserts), 1), "002").collect());
                HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ);
                hoodieWriteClient.rollback("002");
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieSparkTable create = HoodieSparkTable.create(config, this.context, this.metaClient);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create));
                Assertions.assertFalse(this.tableView.getLatestBaseFiles().anyMatch(hoodieBaseFile -> {
                    return str.equals(hoodieBaseFile.getCommitTime());
                }));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCOWToMORConvertedTableRollbackUsingFileList() throws Exception {
        testCOWToMORConvertedTableRollback(false);
    }

    @Test
    public void testCOWToMORConvertedTableRollbackUsingMarkers() throws Exception {
        testCOWToMORConvertedTableRollback(true);
    }

    private void testRollbackWithDeltaAndCompactionCommit(Boolean bool, boolean z) throws Exception {
        Throwable th;
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder((Boolean) false, bool, HoodieIndex.IndexType.SIMPLE);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th2 = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = this.dataGen.generateInserts("001", 200);
            JavaRDD upsert = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001");
            hoodieWriteClient.commit("001", upsert);
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
            this.metaClient = getHoodieMetaClient(this.hadoopConf, build.getBasePath());
            HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
            Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            Assertions.assertTrue(firstInstant.isPresent());
            Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
            Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
            this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath);
            Assertions.assertTrue(!this.tableView.getLatestBaseFiles().findAny().isPresent());
            this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
            Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
            SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));
            Throwable th3 = null;
            try {
                try {
                    hoodieWriteClient2.startCommitWithTime("002");
                    List generateUpdates = this.dataGen.generateUpdates("002", new ArrayList(generateInserts));
                    generateUpdates.addAll(this.dataGen.generateInserts("002", 200));
                    Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map((v0) -> {
                        return v0.getPath();
                    }).collect(Collectors.toList()), this.basePath).size());
                    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient2.upsert(this.jsc.parallelize(generateUpdates, 1), "002").collect());
                    hoodieWriteClient2.rollback("002");
                    List list = (List) Arrays.stream(listAllBaseFilesInPath(create)).filter(fileStatus -> {
                        return fileStatus.getPath().getName().contains("002");
                    }).map(fileStatus2 -> {
                        return fileStatus2.getPath().toString();
                    }).collect(Collectors.toList());
                    Assertions.assertEquals(0, list.size(), "There files should have been rolled-back when rolling back commit 002 but are still remaining. Files: " + list);
                    Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map((v0) -> {
                        return v0.getPath();
                    }).collect(Collectors.toList()), this.basePath).size());
                    if (hoodieWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                hoodieWriteClient2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            hoodieWriteClient2.close();
                        }
                    }
                    hoodieWriteClient2 = getHoodieWriteClient(build);
                    th = null;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
                try {
                    try {
                        hoodieWriteClient2.startCommitWithTime("002");
                        List generateUpdates2 = this.dataGen.generateUpdates("002", new ArrayList(generateInserts));
                        generateUpdates2.addAll(this.dataGen.generateInserts("002", 200));
                        Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map((v0) -> {
                            return v0.getPath();
                        }).collect(Collectors.toList()), this.basePath).size());
                        JavaRDD parallelize = this.jsc.parallelize(generateUpdates2, 1);
                        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient2.upsert(parallelize, "002").collect());
                        hoodieWriteClient2.rollback("002");
                        FileStatus[] listAllBaseFilesInPath2 = listAllBaseFilesInPath(create);
                        Assertions.assertEquals(0L, Arrays.stream(listAllBaseFilesInPath2).filter(fileStatus3 -> {
                            return fileStatus3.getPath().getName().contains("002");
                        }).count());
                        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                        HoodieSparkTable create2 = HoodieSparkTable.create(build, this.context, this.metaClient);
                        this.tableView = getHoodieTableFileSystemView(this.metaClient, create2.getCompletedCommitsTimeline(), listAllBaseFilesInPath2);
                        Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map((v0) -> {
                            return v0.getPath();
                        }).collect(Collectors.toList()), this.basePath).size());
                        hoodieWriteClient2.startCommitWithTime("003");
                        JavaRDD upsert2 = hoodieWriteClient2.upsert(parallelize, "003");
                        List collect = upsert2.collect();
                        hoodieWriteClient2.commit("003", upsert2);
                        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
                        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                        hoodieWriteClient2.compact(hoodieWriteClient2.scheduleCompaction(Option.empty()).get().toString());
                        FileStatus[] listAllBaseFilesInPath3 = listAllBaseFilesInPath(create2);
                        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                        this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline(), listAllBaseFilesInPath3);
                        String timestamp = ((HoodieInstant) this.metaClient.getActiveTimeline().reload().lastInstant().get()).getTimestamp();
                        Assertions.assertTrue(Arrays.stream(listAllBaseFilesInPath(create2)).anyMatch(fileStatus4 -> {
                            return timestamp.equals(new HoodieBaseFile(fileStatus4).getCommitTime());
                        }));
                        hoodieWriteClient2.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", timestamp), create2);
                        FileStatus[] listAllBaseFilesInPath4 = listAllBaseFilesInPath(create2);
                        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                        this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline(), listAllBaseFilesInPath4);
                        Assertions.assertFalse(this.tableView.getLatestBaseFiles().anyMatch(hoodieBaseFile -> {
                            return timestamp.equals(hoodieBaseFile.getCommitTime());
                        }));
                        if (hoodieWriteClient2 != null) {
                            if (0 != 0) {
                                try {
                                    hoodieWriteClient2.close();
                                } catch (Throwable th6) {
                                    th.addSuppressed(th6);
                                }
                            } else {
                                hoodieWriteClient2.close();
                            }
                        }
                        if (hoodieWriteClient != null) {
                            if (0 == 0) {
                                hoodieWriteClient.close();
                                return;
                            }
                            try {
                                hoodieWriteClient.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        }
                    } catch (Throwable th8) {
                        th = th8;
                        throw th8;
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th9) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th9;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testRollbackWithDeltaAndCompactionCommitUsingFileList(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        testRollbackWithDeltaAndCompactionCommit(false, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        testRollbackWithDeltaAndCompactionCommit(true, z);
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testMultiRollbackWithDeltaAndCompactionCommit(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(false);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 200);
                JavaRDD upsert = hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001");
                hoodieWriteClient.commit("001", upsert);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
                this.metaClient = getHoodieMetaClient(this.hadoopConf, build.getBasePath());
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
                Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
                Assertions.assertTrue(firstInstant.isPresent());
                Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
                Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath);
                Assertions.assertFalse(this.tableView.getLatestBaseFiles().findAny().isPresent());
                this.tableView = getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath);
                Assertions.assertTrue(this.tableView.getLatestBaseFiles().findAny().isPresent(), "Should list the base files we wrote in the delta commit");
                SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(z));
                hoodieWriteClient2.startCommitWithTime("002");
                List generateUpdates = this.dataGen.generateUpdates("002", new ArrayList(generateInserts));
                generateUpdates.addAll(this.dataGen.generateInserts("002", 200));
                Assertions.assertEquals(200, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) this.tableView.getLatestBaseFiles().map(hoodieBaseFile -> {
                    return hoodieBaseFile.getPath();
                }).collect(Collectors.toList()), this.basePath).size());
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient2.upsert(this.jsc.parallelize(generateUpdates, 1), "002").collect());
                hoodieWriteClient2.commit("002", upsert);
                generateUpdates.clear();
                hoodieWriteClient.startCommitWithTime("003");
                List generateInserts2 = this.dataGen.generateInserts("003", 100);
                List generateUpdates2 = this.dataGen.generateUpdates("003", generateInserts);
                generateUpdates2.addAll(generateInserts2);
                JavaRDD upsert2 = hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates2, 1), "003");
                hoodieWriteClient.commit("003", upsert2);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert2.collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                hoodieWriteClient.scheduleCompactionAtInstant("004", Option.empty());
                hoodieWriteClient.startCommitWithTime("005");
                List generateUpdates3 = this.dataGen.generateUpdates("005", generateUpdates2);
                JavaRDD upsert3 = hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates3, 1), "005");
                hoodieWriteClient.commit("005", upsert3);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert3.collect());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                hoodieWriteClient.scheduleCompactionAtInstant("006", Option.empty());
                hoodieWriteClient.commitCompaction("006", (JavaRDD) hoodieWriteClient.compact("006"), Option.empty());
                FileStatus[] listAllBaseFilesInPath2 = listAllBaseFilesInPath(create);
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline(), listAllBaseFilesInPath2);
                String timestamp = ((HoodieInstant) this.metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get()).getTimestamp();
                Assertions.assertTrue(this.tableView.getLatestBaseFiles().anyMatch(hoodieBaseFile2 -> {
                    return timestamp.equals(hoodieBaseFile2.getCommitTime());
                }));
                hoodieWriteClient.startCommitWithTime("007");
                List generateUpdates4 = this.dataGen.generateUpdates("007", new ArrayList(generateUpdates3));
                generateUpdates4.addAll(this.dataGen.generateInserts("007", 200));
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates4, 1), "007").collect());
                hoodieWriteClient.commit("007", upsert3);
                generateUpdates4.clear();
                hoodieWriteClient.restoreToInstant("000");
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                FileStatus[] listAllBaseFilesInPath3 = listAllBaseFilesInPath(create);
                this.tableView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath3);
                Assertions.assertFalse(this.tableView.getLatestBaseFiles().findAny().isPresent());
                HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath3);
                Assertions.assertTrue(((List) hoodieTableFileSystemView.getAllFileGroups().collect(Collectors.toList())).isEmpty());
                Assertions.assertEquals(0L, hoodieTableFileSystemView.getAllFileGroups().filter(hoodieFileGroup -> {
                    return hoodieFileGroup.getAllRawFileSlices().noneMatch(fileSlice -> {
                        return fileSlice.getLogFiles().count() == 0;
                    });
                }).count());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean z) {
        HoodieWriteConfig.Builder forTable = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withEmbeddedTimelineServerEnabled(true).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024L).parquetMaxFileSize(1024L).build()).forTable("test-trip-table");
        if (!z) {
            addConfigsForPopulateMetaFields(forTable, false);
        }
        return forTable.build();
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testUpsertPartitioner(boolean z) throws Exception {
        clean();
        init((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), z);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = this.dataGen.generateInserts("001", 20);
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect());
            this.metaClient = getHoodieMetaClient(this.hadoopConf, build.getBasePath());
            HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
            Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            Assertions.assertTrue(firstInstant.isPresent());
            Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
            Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
            Map map = (Map) getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().collect(Collectors.toMap((v0) -> {
                return v0.getFileId();
            }, (v0) -> {
                return v0.getFileSize();
            }));
            Assertions.assertTrue(((List) getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().collect(Collectors.toList())).size() > 0, "Should list the base files we wrote in the delta commit");
            hoodieWriteClient.startCommitWithTime("002");
            List generateUpdates = this.dataGen.generateUpdates("002", generateInserts);
            generateUpdates.addAll(this.dataGen.generateInserts("002", 20));
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates), "002").collect());
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            Option lastInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
            Assertions.assertTrue(lastInstant.isPresent());
            Assertions.assertEquals("002", ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should be 002");
            Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(this.metaClient, create.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), listAllBaseFilesInPath(create));
            Assertions.assertTrue(((Map) ((List) hoodieTableFileSystemView.getLatestBaseFiles().collect(Collectors.toList())).stream().collect(Collectors.toMap((v0) -> {
                return v0.getFileId();
            }, (v0) -> {
                return v0.getFileSize();
            }))).entrySet().stream().anyMatch(entry -> {
                return ((Long) map.get(entry.getKey())).longValue() < ((Long) entry.getValue()).longValue();
            }));
            Assertions.assertEquals(40, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) hoodieTableFileSystemView.getLatestBaseFiles().map((v0) -> {
                return v0.getPath();
            }).collect(Collectors.toList()), this.basePath, new JobConf(this.hadoopConf), true, false).size(), "Must contain 40 records");
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @MethodSource({"populateMetaFieldsParams"})
    @ParameterizedTest
    public void testLogFileCountsAfterCompaction(boolean z) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                List generateInserts = this.dataGen.generateInserts("100", 100);
                hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "100").collect();
                this.metaClient = getHoodieMetaClient(this.hadoopConf, this.basePath);
                hoodieWriteClient.startCommitWithTime("101");
                List collect = new HoodieReadClient(this.context, build).tagLocation(this.jsc.parallelize(this.dataGen.generateUpdates("101", generateInserts), 1)).collect();
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
                HoodieSparkWriteableTestTable.of((HoodieTable) create, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS).withLogAppends(collect);
                create.getSliceView().reset();
                for (String str : this.dataGen.getPartitionPaths()) {
                    Iterator it = ((List) create.getSliceView().getLatestFileSlices(str).collect(Collectors.toList())).iterator();
                    while (it.hasNext()) {
                        Assertions.assertEquals(1L, ((FileSlice) it.next()).getLogFiles().count(), "There should be 1 log file written for every data file");
                    }
                }
                this.metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "101"));
                this.metaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "101"), Option.empty());
                JavaRDD javaRDD = (JavaRDD) hoodieWriteClient.compact(hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString());
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                HoodieSparkTable create2 = HoodieSparkTable.create(build, this.context, this.metaClient);
                Assertions.assertTrue(HoodieTimeline.compareTimestamps(((HoodieInstant) this.metaClient.getActiveTimeline().lastInstant().get()).getTimestamp(), HoodieTimeline.GREATER_THAN, "101"), "Compaction commit should be > than last insert");
                for (String str2 : this.dataGen.getPartitionPaths()) {
                    Iterator it2 = ((List) create2.getSliceView().getLatestFileSlices(str2).collect(Collectors.toList())).iterator();
                    while (it2.hasNext()) {
                        Assertions.assertEquals(0L, ((FileSlice) it2.next()).getLogFiles().count(), "After compaction there should be no log files visible on a full view");
                    }
                    Assertions.assertTrue(javaRDD.collect().stream().anyMatch(writeStatus -> {
                        return writeStatus.getStat().getPartitionPath().contentEquals(str2);
                    }));
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("100");
            hoodieWriteClient.commit("100", hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("100", 100), 1), "100"));
            HoodieSparkTable create = HoodieSparkTable.create(build, this.context, getHoodieMetaClient(this.hadoopConf, this.basePath));
            TableFileSystemView.SliceView sliceView = create.getSliceView();
            long j = 0;
            for (String str : this.dataGen.getPartitionPaths()) {
                List list = (List) sliceView.getLatestFileSlices(str).collect(Collectors.toList());
                Assertions.assertEquals(0L, list.stream().filter(fileSlice -> {
                    return fileSlice.getBaseFile().isPresent();
                }).count());
                Assertions.assertTrue(list.stream().anyMatch(fileSlice2 -> {
                    return fileSlice2.getLogFiles().count() > 0;
                }));
                long count = list.stream().filter(fileSlice3 -> {
                    return fileSlice3.getLogFiles().count() > 0;
                }).count();
                if (count > 0) {
                    Assertions.assertTrue(list.stream().map(fileSlice4 -> {
                        return Integer.valueOf(((HoodieLogFile) fileSlice4.getLogFiles().findFirst().get()).getLogVersion());
                    }).allMatch(num -> {
                        return num.equals(HoodieLogFile.LOGFILE_BASE_VERSION);
                    }));
                }
                j += count;
            }
            Assertions.assertTrue(j > 0);
            String obj = hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString();
            JavaRDD javaRDD = (JavaRDD) hoodieWriteClient.compact(obj);
            String baseFileExtension = create.getBaseFileExtension();
            Assertions.assertEquals(j, javaRDD.map(writeStatus -> {
                return Boolean.valueOf(writeStatus.getStat().getPath().contains(baseFileExtension));
            }).count());
            Assertions.assertEquals(j, javaRDD.count());
            hoodieWriteClient.commitCompaction(obj, javaRDD, Option.empty());
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private void testInsertsGeneratedIntoLogFilesRollback(Boolean bool) throws Exception {
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, bool, HoodieIndex.IndexType.INMEMORY).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                List collect = hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("100", 100), 1), "100").collect();
                Assertions.assertEquals(0L, collect.stream().filter(writeStatus -> {
                    return !writeStatus.getStat().getPath().contains("log");
                }).count());
                Assertions.assertTrue(collect.stream().anyMatch(writeStatus2 -> {
                    return writeStatus2.getStat().getPath().contains("log");
                }));
                Assertions.assertTrue(hoodieWriteClient.rollback("100"));
                hoodieWriteClient.startCommitWithTime("101");
                hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("101", 100), 1), "101").collect();
                Thread.sleep(1000L);
                this.metaClient = getHoodieMetaClient(this.hadoopConf, this.basePath);
                HashMap hashMap = new HashMap();
                Iterator it = Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT).iterator();
                while (it.hasNext()) {
                    HoodieInstant hoodieInstant = new HoodieInstant((HoodieInstant.State) it.next(), "deltacommit", "101");
                    File file = Files.createTempFile(this.tempFolder, null, null, new FileAttribute[0]).toFile();
                    this.metaClient.getFs().copyToLocalFile(new org.apache.hadoop.fs.Path(this.metaClient.getMetaPath(), hoodieInstant.getFileName()), new org.apache.hadoop.fs.Path(file.getAbsolutePath()));
                    hashMap.put(file.getAbsolutePath(), hoodieInstant.getFileName());
                }
                org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(Files.createTempDirectory(this.tempFolder, null, new FileAttribute[0]).toAbsolutePath().toString());
                if (bool.booleanValue()) {
                    this.metaClient.getFs().copyToLocalFile(new org.apache.hadoop.fs.Path(this.metaClient.getMarkerFolderPath("101")), path);
                }
                hoodieWriteClient.rollback("101");
                this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
                TableFileSystemView.SliceView sliceView = HoodieSparkTable.create(build, this.context).getSliceView();
                long j = 0;
                for (String str : this.dataGen.getPartitionPaths()) {
                    Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice -> {
                        return fileSlice.getBaseFile().isPresent();
                    }));
                    Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice2 -> {
                        return fileSlice2.getLogFiles().count() > 0;
                    }));
                    j += sliceView.getLatestFileSlices(str).filter(fileSlice3 -> {
                        return fileSlice3.getLogFiles().count() > 0;
                    }).count();
                }
                Assertions.assertEquals(0L, j);
                hashMap.forEach((str2, str3) -> {
                    try {
                        this.metaClient.getFs().copyFromLocalFile(new org.apache.hadoop.fs.Path(str2), new org.apache.hadoop.fs.Path(this.metaClient.getMetaPath(), str3));
                    } catch (IOException e) {
                        throw new HoodieIOException("Error copying state from local disk.", e);
                    }
                });
                if (bool.booleanValue()) {
                    this.metaClient.getFs().copyFromLocalFile(path, new org.apache.hadoop.fs.Path(this.metaClient.getMarkerFolderPath("101")));
                }
                Thread.sleep(1000L);
                hoodieWriteClient.rollback("101");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList() throws Exception {
        testInsertsGeneratedIntoLogFilesRollback(false);
    }

    @Test
    public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers() throws Exception {
        testInsertsGeneratedIntoLogFilesRollback(true);
    }

    private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(Boolean bool) throws Exception {
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, bool, HoodieIndex.IndexType.INMEMORY).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                hoodieWriteClient.commit("100", hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("100", 100), 1), "100"));
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, getHoodieMetaClient(this.hadoopConf, this.basePath));
                TableFileSystemView.SliceView sliceView = create.getSliceView();
                long j = 0;
                for (String str : this.dataGen.getPartitionPaths()) {
                    Assertions.assertTrue(sliceView.getLatestFileSlices(str).noneMatch(fileSlice -> {
                        return fileSlice.getBaseFile().isPresent();
                    }));
                    Assertions.assertTrue(sliceView.getLatestFileSlices(str).anyMatch(fileSlice2 -> {
                        return fileSlice2.getLogFiles().count() > 0;
                    }));
                    j += sliceView.getLatestFileSlices(str).filter(fileSlice3 -> {
                        return fileSlice3.getLogFiles().count() > 0;
                    }).count();
                }
                Assertions.assertTrue(j > 0);
                String obj = hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString();
                JavaRDD javaRDD = (JavaRDD) hoodieWriteClient.compact(obj);
                String baseFileExtension = create.getBaseFileExtension();
                Assertions.assertEquals(j, javaRDD.map(writeStatus -> {
                    return Boolean.valueOf(writeStatus.getStat().getPath().contains(baseFileExtension));
                }).count());
                Assertions.assertEquals(j, javaRDD.count());
                create.getActiveTimeline().reload();
                hoodieWriteClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", obj), create);
                HoodieSparkTable.create(build, this.context, getHoodieMetaClient(this.hadoopConf, this.basePath)).getSliceView().reset();
                for (String str2 : this.dataGen.getPartitionPaths()) {
                    List list = (List) getFileSystemViewWithUnCommittedSlices(getHoodieMetaClient(this.hadoopConf, this.basePath)).getAllFileSlices(str2).filter(fileSlice4 -> {
                        return fileSlice4.getBaseInstantTime().equals("100");
                    }).collect(Collectors.toList());
                    Assertions.assertTrue(list.stream().noneMatch(fileSlice5 -> {
                        return fileSlice5.getBaseFile().isPresent();
                    }));
                    Assertions.assertTrue(list.stream().anyMatch(fileSlice6 -> {
                        return fileSlice6.getLogFiles().count() > 0;
                    }));
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList() throws Exception {
        testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(false);
    }

    @Test
    public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers() throws Exception {
        testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(true);
    }

    public void testMetadataStatsOnCommit(Boolean bool) throws Exception {
        HoodieWriteConfig build = getConfigBuilder((Boolean) false, bool, HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                this.metaClient = getHoodieMetaClient(this.hadoopConf, this.basePath);
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
                HoodieActiveTimeline activeTimeline = create.getActiveTimeline();
                String commitActionType = create.getMetaClient().getCommitActionType();
                HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, "000");
                activeTimeline.createNewInstant(hoodieInstant);
                activeTimeline.transitionRequestedToInflight(hoodieInstant, Option.empty());
                activeTimeline.saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActionType, "000"), Option.empty());
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = this.dataGen.generateInserts("001", 200);
                Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "001")), "Commit should succeed");
                HoodieSparkTable create2 = HoodieSparkTable.create(build, this.context);
                int i = 0;
                Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create2.getActiveTimeline().getInstantDetails((HoodieInstant) create2.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
                    while (it2.hasNext()) {
                        i = (int) (i + ((HoodieWriteStat) it2.next()).getNumInserts());
                    }
                }
                Assertions.assertEquals(200, i);
                hoodieWriteClient.startCommitWithTime("002");
                int i2 = 0;
                int i3 = 0;
                for (WriteStatus writeStatus : hoodieWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("002", generateInserts), 1), "002").collect()) {
                    i2 = (int) (i2 + writeStatus.getStat().getNumInserts());
                    i3 = (int) (i3 + writeStatus.getStat().getNumUpdateWrites());
                }
                Assertions.assertEquals(0, i2);
                Assertions.assertEquals(200, i3);
                hoodieWriteClient.rollback("002");
                HoodieSparkTable create3 = HoodieSparkTable.create(build, this.context);
                int i4 = 0;
                int i5 = 0;
                Iterator it3 = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create3.getActiveTimeline().getInstantDetails((HoodieInstant) create3.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it3.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it3.next()).getValue()) {
                        i4 = (int) (i4 + hoodieWriteStat.getNumInserts());
                        i5 = (int) (i5 + hoodieWriteStat.getNumUpdateWrites());
                    }
                }
                Assertions.assertEquals(200, i4);
                Assertions.assertEquals(0, i5);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testMetadataStatsOnCommitUsingFileList() throws Exception {
        testMetadataStatsOnCommit(false);
    }

    @Test
    public void testMetadataStatsOnCommitUsingMarkers() throws Exception {
        testMetadataStatsOnCommit(true);
    }

    @Test
    public void testRollingStatsWithSmallFileHandling() throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                hoodieWriteClient.startCommitWithTime("000");
                List generateInserts = this.dataGen.generateInserts("000", 200);
                Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "000")), "Commit should succeed");
                HoodieSparkTable create = HoodieSparkTable.create(build, this.context);
                int i = 0;
                Iterator it = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create.getActiveTimeline().getInstantDetails((HoodieInstant) create.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it.next()).getValue()) {
                        i = (int) (i + hoodieWriteStat.getNumInserts());
                        hashMap.put(hoodieWriteStat.getFileId(), Long.valueOf(hoodieWriteStat.getNumInserts()));
                        hashMap2.put(hoodieWriteStat.getFileId(), Long.valueOf(hoodieWriteStat.getNumUpdateWrites()));
                    }
                }
                Assertions.assertEquals(200, i);
                hoodieWriteClient.startCommitWithTime("001");
                List generateUpdates = this.dataGen.generateUpdates("001", generateInserts);
                generateUpdates.addAll(this.dataGen.generateInserts("001", 200));
                Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates, 1), "001")), "Commit should succeed");
                HoodieSparkTable create2 = HoodieSparkTable.create(build, this.context);
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create2.getActiveTimeline().getInstantDetails((HoodieInstant) create2.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class);
                int i2 = 0;
                int i3 = 0;
                Iterator it2 = hoodieCommitMetadata.getPartitionToWriteStats().entrySet().iterator();
                while (it2.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat2 : (List) ((Map.Entry) it2.next()).getValue()) {
                        Assertions.assertTrue(hashMap.containsKey(hoodieWriteStat2.getFileId()));
                        Assertions.assertTrue(hashMap2.containsKey(hoodieWriteStat2.getFileId()));
                        i2 = (int) (i2 + hoodieWriteStat2.getNumInserts());
                        i3 = (int) (i3 + hoodieWriteStat2.getNumUpdateWrites());
                    }
                }
                Assertions.assertEquals(200, i2);
                Assertions.assertEquals(200, i3);
                hoodieWriteClient.scheduleCompactionAtInstant("002", Option.of(hoodieCommitMetadata.getExtraMetadata()));
                hoodieWriteClient.commitCompaction("002", (JavaRDD) hoodieWriteClient.compact("002"), Option.empty());
                HoodieSparkTable create3 = HoodieSparkTable.create(build, this.context);
                HoodieCommitMetadata hoodieCommitMetadata2 = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create3.getActiveTimeline().getInstantDetails((HoodieInstant) create3.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class);
                for (Map.Entry entry : hoodieCommitMetadata.getPartitionToWriteStats().entrySet()) {
                    Assertions.assertTrue(hoodieCommitMetadata2.getPartitionToWriteStats().containsKey(entry.getKey()));
                    Assertions.assertEquals(((List) hoodieCommitMetadata2.getPartitionToWriteStats().get(entry.getKey())).size(), ((List) entry.getValue()).size());
                }
                hoodieWriteClient.startCommitWithTime("003");
                List generateUpdates2 = this.dataGen.generateUpdates("003", generateUpdates);
                generateUpdates2.addAll(this.dataGen.generateInserts("003", 200));
                Assertions.assertTrue(hoodieWriteClient.commit("003", hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates2, 1), "003")), "Commit should succeed");
                HoodieSparkTable create4 = HoodieSparkTable.create(build, this.context);
                int i4 = 0;
                int i5 = 0;
                Iterator it3 = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) create4.getActiveTimeline().getInstantDetails((HoodieInstant) create4.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().iterator();
                while (it3.hasNext()) {
                    for (HoodieWriteStat hoodieWriteStat3 : (List) ((Map.Entry) it3.next()).getValue()) {
                        Assertions.assertTrue(hashMap.containsKey(hoodieWriteStat3.getFileId()));
                        i4 = (int) (i4 + hoodieWriteStat3.getNumInserts());
                        i5 = (int) (i5 + hoodieWriteStat3.getNumUpdateWrites());
                    }
                }
                Assertions.assertEquals(200, i4);
                Assertions.assertEquals(400, i5);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHandleUpdateWithMultiplePartitions() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = this.dataGen.generateInserts("001", 20);
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(generateInserts, 1), "001").collect());
            this.metaClient = getHoodieMetaClient(this.hadoopConf, config.getBasePath());
            HoodieSparkMergeOnReadTable create = HoodieSparkTable.create(config, this.context, this.metaClient);
            Option firstInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            Assertions.assertTrue(firstInstant.isPresent());
            Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
            Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
            FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
            Assertions.assertFalse(getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
            Assertions.assertTrue(getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
            hoodieWriteClient.startCommitWithTime("002");
            this.metaClient.reloadActiveTimeline();
            List generateUpdates = this.dataGen.generateUpdates("002", generateInserts);
            List collect = hoodieWriteClient.upsert(this.jsc.parallelize(generateUpdates, 1), "002").collect();
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
            String partitionPath = ((HoodieRecord) generateUpdates.get(0)).getPartitionPath();
            String fileId = ((WriteStatus) collect.get(0)).getFileId();
            hoodieWriteClient.startCommitWithTime("004");
            this.metaClient.reloadActiveTimeline();
            List generateDeletesFromExistingRecords = this.dataGen.generateDeletesFromExistingRecords(generateUpdates);
            JavaRDD<HoodieRecord> parallelize = this.jsc.parallelize(generateDeletesFromExistingRecords, 1);
            SparkDeleteDeltaCommitActionExecutor sparkDeleteDeltaCommitActionExecutor = new SparkDeleteDeltaCommitActionExecutor(this.context, config, create, "004", parallelize);
            sparkDeleteDeltaCommitActionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(parallelize)));
            WriteStatus writeStatus = (WriteStatus) ((List) this.jsc.parallelize(Arrays.asList(1)).map(num -> {
                return sparkDeleteDeltaCommitActionExecutor.handleUpdate(partitionPath, fileId, generateDeletesFromExistingRecords.iterator());
            }).map(Transformations::flatten).collect().get(0)).get(0);
            Assertions.assertTrue(writeStatus.hasErrors());
            Assertions.assertEquals(generateDeletesFromExistingRecords.size() - generateDeletesFromExistingRecords.stream().filter(hoodieRecord -> {
                return hoodieRecord.getPartitionPath().equals(partitionPath);
            }).count(), writeStatus.getTotalErrorRecords());
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private HoodieWriteConfig getConfig(Boolean bool) {
        return getConfigBuilder(bool).build();
    }

    private HoodieWriteConfig getConfig(Boolean bool, Boolean bool2) {
        return getConfigBuilder(bool, bool2, HoodieIndex.IndexType.BLOOM).build();
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool) {
        return getConfigBuilder(bool, HoodieIndex.IndexType.BLOOM);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, HoodieIndex.IndexType indexType) {
        return getConfigBuilder(bool, (Boolean) false, indexType);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, long j, HoodieClusteringConfig hoodieClusteringConfig) {
        return getConfigBuilder(bool, false, HoodieIndex.IndexType.BLOOM, j, hoodieClusteringConfig);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, Boolean bool2, HoodieIndex.IndexType indexType) {
        return getConfigBuilder(bool, bool2, indexType, 1073741824L, HoodieClusteringConfig.newBuilder().build());
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, Boolean bool2, HoodieIndex.IndexType indexType, long j, HoodieClusteringConfig hoodieClusteringConfig) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(bool.booleanValue()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1073741824L).parquetMaxFileSize(1073741824L).build()).withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table").withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withClusteringConfig(hoodieClusteringConfig).withRollbackUsingMarkers(bool2.booleanValue());
    }

    private void insertRecords(List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        List<WriteStatus> collect = sparkRDDWriteClient.insert(this.jsc.parallelize(list, 1), str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        assertFileSizes(collect);
        this.metaClient = getHoodieMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath());
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, this.context, this.metaClient);
        Option lastInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        Assertions.assertTrue(lastInstant.isPresent());
        Assertions.assertEquals(str, ((HoodieInstant) lastInstant.get()).getTimestamp(), "Delta commit should be specified value");
        Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().lastInstant().isPresent());
        FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
        Assertions.assertTrue(!getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
        Assertions.assertTrue(getHoodieTableFileSystemView(this.metaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
    }

    private void updateRecords(List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        HashMap hashMap = new HashMap();
        for (HoodieRecord hoodieRecord : list) {
            if (!hashMap.containsKey(hoodieRecord.getKey())) {
                hashMap.put(hoodieRecord.getKey(), hoodieRecord);
            }
        }
        List<WriteStatus> collect = sparkRDDWriteClient.upsert(this.jsc.parallelize(list, 1), str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        assertFileSizes(collect);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        Option lastInstant = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        Assertions.assertTrue(lastInstant.isPresent());
        Assertions.assertEquals(str, ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should match specified time");
        Assertions.assertFalse(this.metaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
    }

    private void assertFileSizes(List<WriteStatus> list) throws IOException {
        for (WriteStatus writeStatus : list) {
            Assertions.assertEquals(FSUtils.getFileSize(this.metaClient.getFs(), new org.apache.hadoop.fs.Path(this.metaClient.getBasePath(), writeStatus.getStat().getPath())), writeStatus.getStat().getFileSizeInBytes());
        }
    }

    private FileStatus[] getROSnapshotFiles(String str) throws Exception {
        FileInputFormat.setInputPaths(this.roSnapshotJobConf, this.basePath + "/" + str);
        return listStatus(this.roSnapshotJobConf, false);
    }

    private FileStatus[] getROIncrementalFiles(String str, boolean z) throws Exception {
        return getROIncrementalFiles(str, "000", -1, z);
    }

    private FileStatus[] getROIncrementalFiles(String str, String str2, int i, boolean z) throws Exception {
        setupIncremental(this.roJobConf, str2, i, z);
        FileInputFormat.setInputPaths(this.roJobConf, Paths.get(this.basePath, str).toString());
        return listStatus(this.roJobConf, false);
    }

    private FileStatus[] getRTIncrementalFiles(String str) throws Exception {
        return getRTIncrementalFiles(str, "000", -1);
    }

    private FileStatus[] getRTIncrementalFiles(String str, String str2, int i) throws Exception {
        setupIncremental(this.rtJobConf, str2, i, false);
        FileInputFormat.setInputPaths(this.rtJobConf, Paths.get(this.basePath, str).toString());
        return listStatus(this.rtJobConf, true);
    }

    private void setupIncremental(JobConf jobConf, String str, int i, boolean z) {
        jobConf.set(String.format("hoodie.%s.consume.mode", "raw_trips"), "INCREMENTAL");
        jobConf.set(String.format("hoodie.%s.consume.start.timestamp", "raw_trips"), str);
        jobConf.setInt(String.format("hoodie.%s.consume.max.commits", "raw_trips"), i);
        jobConf.setBoolean(String.format("hoodie.%s.ro.stop.at.compaction", "raw_trips"), z);
    }

    private void validateFiles(String str, int i, FileStatus[] fileStatusArr, boolean z, JobConf jobConf, int i2, String... strArr) {
        Assertions.assertEquals(i, fileStatusArr.length);
        Set set = (Set) Arrays.stream(strArr).collect(Collectors.toSet());
        List<GenericRecord> recordsUsingInputFormat = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, Collections.singletonList(Paths.get(this.basePath, str).toString()), this.basePath, jobConf, z);
        Assertions.assertEquals(i2, recordsUsingInputFormat.size());
        Assertions.assertEquals(set, (Set) recordsUsingInputFormat.stream().map(genericRecord -> {
            return genericRecord.get("_hoodie_commit_time").toString();
        }).collect(Collectors.toSet()));
    }

    private FileStatus[] listAllBaseFilesInPath(HoodieTable hoodieTable) throws IOException {
        return HoodieTestTable.of(hoodieTable.getMetaClient()).listAllBaseFiles(hoodieTable.getBaseFileExtension());
    }

    private FileStatus[] listStatus(JobConf jobConf, boolean z) throws IOException {
        HoodieParquetRealtimeInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(this.baseFileFormat, z, jobConf);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[this.baseFileFormat.ordinal()]) {
            case 1:
                return z ? inputFormat.listStatus(jobConf) : ((HoodieParquetInputFormat) inputFormat).listStatus(jobConf);
            case 2:
                return z ? ((HoodieHFileRealtimeInputFormat) inputFormat).listStatus(jobConf) : ((HoodieHFileInputFormat) inputFormat).listStatus(jobConf);
            default:
                throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + this.baseFileFormat);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1665645825:
                if (implMethodName.equals("lambda$testHandleUpdateWithMultiplePartitions$837c638a$1")) {
                    z = 3;
                    break;
                }
                break;
            case -985889156:
                if (implMethodName.equals("lambda$testSimpleInsertsGeneratedIntoLogFiles$768a501b$1")) {
                    z = false;
                    break;
                }
                break;
            case -842262550:
                if (implMethodName.equals("lambda$testInsertsGeneratedIntoLogFilesRollbackAfterCompaction$927ab29e$1")) {
                    z = 2;
                    break;
                }
                break;
            case -778804732:
                if (implMethodName.equals("flatten")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/TestHoodieMergeOnReadTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/client/WriteStatus;)Ljava/lang/Boolean;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return writeStatus -> {
                        return Boolean.valueOf(writeStatus.getStat().getPath().contains(str));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/List;")) {
                    return Transformations::flatten;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/TestHoodieMergeOnReadTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/client/WriteStatus;)Ljava/lang/Boolean;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return writeStatus2 -> {
                        return Boolean.valueOf(writeStatus2.getStat().getPath().contains(str2));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/TestHoodieMergeOnReadTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    AbstractSparkDeltaCommitActionExecutor abstractSparkDeltaCommitActionExecutor = (AbstractSparkDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    String str3 = (String) serializedLambda.getCapturedArg(1);
                    String str4 = (String) serializedLambda.getCapturedArg(2);
                    List list = (List) serializedLambda.getCapturedArg(3);
                    return num -> {
                        return abstractSparkDeltaCommitActionExecutor.handleUpdate(str3, str4, list.iterator());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
