package org.apache.hudi.table.functional;

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.class */
public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClientFunctionalTestHarness {
    private static Stream<Arguments> testSimpleInsertAndUpdate() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{HoodieFileFormat.PARQUET, true}), Arguments.of(new Object[]{HoodieFileFormat.PARQUET, false}), Arguments.of(new Object[]{HoodieFileFormat.HFILE, true})});
    }

    @MethodSource
    @ParameterizedTest
    public void testSimpleInsertAndUpdate(HoodieFileFormat hoodieFileFormat, boolean z) throws Exception {
        Properties properties = z ? new Properties() : getPropertiesForKeyGen();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), hoodieFileFormat.toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            hoodieWriteClient.startCommitWithTime("001");
            Assertions.assertTrue(insertRecordsToMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateInserts("001", 200), hoodieWriteClient, build, "001").findAny().isPresent(), "should list the base files we wrote in the delta commit");
            hoodieWriteClient.startCommitWithTime("004");
            updateRecordsInMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("004", 100), hoodieWriteClient, build, "004", false);
            hoodieWriteClient.compact(hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString());
            HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), hoodieMetaClient);
            create.getHoodieView().sync();
            Assertions.assertTrue(getHoodieTableFileSystemView(hoodieMetaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create)).getLatestBaseFiles().findAny().isPresent());
            HoodieTimeline filterCompletedInstants = HoodieTableMetaClient.reload(hoodieMetaClient).getCommitTimeline().filterCompletedInstants();
            Assertions.assertEquals(1, filterCompletedInstants.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
            Assertions.assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp()));
            if (build.populateMetaFields()) {
                Assertions.assertEquals(200L, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), filterCompletedInstants, Option.of("000")), "Must contain 200 records");
            } else {
                Assertions.assertEquals(200L, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), filterCompletedInstants, Option.empty()));
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInlineScheduleCompaction(boolean z) throws Exception {
        HoodieFileFormat hoodieFileFormat = HoodieFileFormat.PARQUET;
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), hoodieFileFormat.toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig build = getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1073741824L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(Boolean.valueOf(z)).build()).build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
            hoodieWriteClient.startCommitWithTime("001");
            Assertions.assertTrue(insertRecordsToMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateInserts("001", 200), hoodieWriteClient, build, "001", true).findAny().isPresent(), "should list the base files we wrote in the delta commit");
            hoodieWriteClient.startCommitWithTime("004");
            updateRecordsInMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("004", 100), hoodieWriteClient, build, "004", true);
            if (z) {
                Assertions.assertEquals(hoodieMetaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 1);
            } else {
                Assertions.assertEquals(hoodieMetaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 0);
            }
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRepeatedRollbackOfCompaction() throws Exception {
        HoodieFileFormat hoodieFileFormat = HoodieFileFormat.PARQUET;
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), hoodieFileFormat.toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig build = getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1073741824L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(false).build()).build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
                hoodieWriteClient.startCommitWithTime("001");
                Assertions.assertTrue(insertRecordsToMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateInserts("001", 200), hoodieWriteClient, build, "001", true).findAny().isPresent(), "should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("004");
                updateRecordsInMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("004", 100), hoodieWriteClient, build, "004", true);
                Option scheduleCompaction = hoodieWriteClient.scheduleCompaction(Option.empty());
                hoodieWriteClient.compact((String) scheduleCompaction.get());
                hoodieWriteClient.compact((String) scheduleCompaction.get());
                hoodieMetaClient.reloadActiveTimeline();
                HoodieInstant hoodieInstant = (HoodieInstant) hoodieMetaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
                FileCreateUtils.deleteRollbackCommit(hoodieMetaClient.getBasePath().substring(hoodieMetaClient.getBasePath().indexOf(":") + 1), hoodieInstant.getTimestamp());
                hoodieMetaClient.reloadActiveTimeline();
                m55getHoodieWriteClient(build).compact((String) scheduleCompaction.get());
                hoodieMetaClient.reloadActiveTimeline();
                Assertions.assertEquals(hoodieInstant.getTimestamp(), ((HoodieInstant) hoodieMetaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get()).getTimestamp());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSimpleInsertUpdateAndDelete(boolean z) throws Exception {
        Properties properties = z ? new Properties() : getPropertiesForKeyGen();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(true);
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.build();
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = hoodieTestDataGenerator.generateInserts("001", 20);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(generateInserts, 1), "001").collect());
                HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), hoodieMetaClient);
                Option firstInstant = hoodieMetaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
                Assertions.assertTrue(firstInstant.isPresent());
                Assertions.assertEquals("001", ((HoodieInstant) firstInstant.get()).getTimestamp(), "Delta commit should be 001");
                Assertions.assertFalse(hoodieMetaClient.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
                Assertions.assertFalse(getHoodieTableFileSystemView(hoodieMetaClient, hoodieMetaClient.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
                Assertions.assertTrue(getHoodieTableFileSystemView(hoodieMetaClient, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent(), "should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("002");
                List generateUpdates = hoodieTestDataGenerator.generateUpdates("002", generateInserts);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(generateUpdates, 1), "002").collect());
                hoodieWriteClient.startCommitWithTime("004");
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(hoodieTestDataGenerator.generateDeletesFromExistingRecords(generateUpdates), 1), "004").collect());
                HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                Option lastInstant = reload.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
                Assertions.assertTrue(lastInstant.isPresent());
                Assertions.assertEquals("004", ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should be 004");
                Assertions.assertFalse(reload.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
                HoodieTableFileSystemView hoodieTableFileSystemView = getHoodieTableFileSystemView(reload, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath(create));
                Assertions.assertTrue(hoodieTableFileSystemView.getLatestBaseFiles().findAny().isPresent());
                Assertions.assertEquals(0, HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), (List) hoodieTableFileSystemView.getLatestBaseFiles().map(hoodieBaseFile -> {
                    return new Path(hoodieBaseFile.getPath()).getParent().toString();
                }).collect(Collectors.toList()), basePath(), new JobConf(hadoopConf()), true, z).size(), "Must contain 0 records");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
        HoodieWriteConfig build = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).build();
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        SparkRDDWriteClient hoodieWriteClient = m55getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
                hoodieWriteClient.commit("100", hoodieWriteClient.insert(jsc().parallelize(hoodieTestDataGenerator.generateInserts("100", 100), 1), "100"));
                HoodieSparkTable create = HoodieSparkTable.create(build, m54context(), hoodieMetaClient);
                create.getHoodieView().sync();
                TableFileSystemView.SliceView sliceView = create.getSliceView();
                long j = 0;
                for (String str : hoodieTestDataGenerator.getPartitionPaths()) {
                    List list = (List) sliceView.getLatestFileSlices(str).collect(Collectors.toList());
                    Assertions.assertEquals(0L, list.stream().filter(fileSlice -> {
                        return fileSlice.getBaseFile().isPresent();
                    }).count());
                    Assertions.assertTrue(list.stream().anyMatch(fileSlice2 -> {
                        return fileSlice2.getLogFiles().count() > 0;
                    }));
                    long count = list.stream().filter(fileSlice3 -> {
                        return fileSlice3.getLogFiles().count() > 0;
                    }).count();
                    if (count > 0) {
                        Assertions.assertTrue(list.stream().map(fileSlice4 -> {
                            return Integer.valueOf(((HoodieLogFile) fileSlice4.getLogFiles().findFirst().get()).getLogVersion());
                        }).allMatch(num -> {
                            return num.equals(HoodieLogFile.LOGFILE_BASE_VERSION);
                        }));
                    }
                    j += count;
                }
                Assertions.assertTrue(j > 0);
                String obj = hoodieWriteClient.scheduleCompaction(Option.empty()).get().toString();
                HoodieWriteMetadata compact = hoodieWriteClient.compact(obj);
                String baseFileExtension = create.getBaseFileExtension();
                Collection values = ((HoodieCommitMetadata) compact.getCommitMetadata().get()).getPartitionToWriteStats().values();
                Assertions.assertEquals(j, values.stream().flatMap((v0) -> {
                    return v0.stream();
                }).filter(hoodieWriteStat -> {
                    return hoodieWriteStat.getPath().contains(baseFileExtension);
                }).count());
                Assertions.assertEquals(j, values.stream().mapToLong((v0) -> {
                    return v0.size();
                }).sum());
                hoodieWriteClient.commitCompaction(obj, (HoodieCommitMetadata) compact.getCommitMetadata().get(), Option.empty());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }
}
