package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.TestCleaner;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.class */
public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(TestCleanerInsertAndCleanByCommits.class);
    private static final int BATCH_SIZE = 100;
    private static final int PARALLELISM = 2;

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInsertAndCleanByCommits(boolean z) throws Exception {
        testInsertAndCleanByCommits((v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, z);
    }

    @Test
    public void testInsertPreppedAndCleanByCommits() throws Exception {
        testInsertAndCleanByCommits((v0, v1, v2) -> {
            return v0.insertPreppedRecords(v1, v2);
        }, (v0, v1, v2) -> {
            return v0.upsertPreppedRecords(v1, v2);
        }, true, false);
    }

    @Test
    public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
        testInsertAndCleanByCommits((sparkRDDWriteClient, javaRDD, str) -> {
            return sparkRDDWriteClient.bulkInsertPreppedRecords(javaRDD, str, Option.empty());
        }, (v0, v1, v2) -> {
            return v0.upsertPreppedRecords(v1, v2);
        }, true, false);
    }

    @Test
    public void testBulkInsertAndCleanByCommits() throws Exception {
        testInsertAndCleanByCommits((v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false);
    }

    private void testInsertAndCleanByCommits(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function3, HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> function32, boolean z, boolean z2) throws Exception {
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2;
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function22;
        HoodieWriteConfig build = getConfigBuilder(true).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(Boolean.valueOf(z2)).retainCommits(3).build()).withParallelism(PARALLELISM, PARALLELISM).withBulkInsertParallelism(PARALLELISM).withFinalizeWriteParallelism(PARALLELISM).withDeleteParallelism(PARALLELISM).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).build();
        SparkRDDWriteClient hoodieWriteClient = m58getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(System.nanoTime());
                if (z) {
                    String basePath = basePath();
                    Configuration hadoopConf = hadoopConf();
                    HoodieSparkEngineContext context = m57context();
                    hoodieTestDataGenerator.getClass();
                    function2 = HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, build, hoodieTestDataGenerator::generateInserts);
                } else {
                    hoodieTestDataGenerator.getClass();
                    function2 = hoodieTestDataGenerator::generateInserts;
                }
                HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function23 = function2;
                if (z) {
                    String basePath2 = basePath();
                    Configuration hadoopConf2 = hadoopConf();
                    HoodieSparkEngineContext context2 = m57context();
                    hoodieTestDataGenerator.getClass();
                    function22 = HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(basePath2, hadoopConf2, context2, build, hoodieTestDataGenerator::generateUniqueUpdates);
                } else {
                    hoodieTestDataGenerator.getClass();
                    function22 = hoodieTestDataGenerator::generateUniqueUpdates;
                }
                HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function24 = function22;
                HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
                TestCleaner.insertFirstBigBatchForClientCleanerTest(m57context(), hoodieMetaClient, hoodieWriteClient, function23, function3);
                HashMap hashMap = new HashMap();
                for (int i = 0; i < 8; i++) {
                    String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
                    hoodieWriteClient.startCommitWithTime(makeNewCommitTime);
                    List collect = ((JavaRDD) function32.apply(hoodieWriteClient, jsc().parallelize((List) function24.apply(makeNewCommitTime, Integer.valueOf(BATCH_SIZE)), PARALLELISM), makeNewCommitTime)).collect();
                    Assertions.assertNoWriteErrors(collect);
                    hashMap.put(makeNewCommitTime, collect.stream().map((v0) -> {
                        return v0.getStat();
                    }).collect(Collectors.toList()));
                    hoodieMetaClient = HoodieTableMetaClient.reload(hoodieMetaClient);
                    validateFilesAfterCleaning(HoodieSparkTable.create(build, m57context(), hoodieMetaClient), hashMap, hoodieTestDataGenerator.getPartitionPaths());
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private void validateFilesAfterCleaning(HoodieTable hoodieTable, Map<String, List<HoodieWriteStat>> map, String[] strArr) {
        org.junit.jupiter.api.Assertions.assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, hoodieTable.getConfig().getCleanerPolicy());
        boolean isAsyncClean = hoodieTable.getConfig().isAsyncClean();
        int cleanerCommitsRetained = hoodieTable.getConfig().getCleanerCommitsRetained();
        HoodieTimeline completedCommitsTimeline = hoodieTable.getCompletedCommitsTimeline();
        HoodieInstant hoodieInstant = (HoodieInstant) completedCommitsTimeline.lastInstant().get();
        if (isAsyncClean) {
            completedCommitsTimeline = completedCommitsTimeline.findInstantsBefore(hoodieInstant.getTimestamp());
        }
        Option nthFromLastInstant = completedCommitsTimeline.nthFromLastInstant(cleanerCommitsRetained - 1);
        HoodieTimeline hoodieTimeline = completedCommitsTimeline;
        HashMap hashMap = new HashMap();
        TableFileSystemView fileSystemView = hoodieTable.getFileSystemView();
        HashSet hashSet = new HashSet();
        for (String str : strArr) {
            hashSet.addAll((Collection) fileSystemView.getAllFileGroups(str).map(hoodieFileGroup -> {
                return Pair.of(str, hoodieFileGroup.getFileGroupId().getFileId());
            }).collect(Collectors.toList()));
        }
        for (HoodieInstant hoodieInstant2 : (List) completedCommitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) {
            map.computeIfAbsent(hoodieInstant2.getTimestamp(), str2 -> {
                try {
                    return ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTimeline.getInstantDetails((HoodieInstant) hoodieTimeline.filter(hoodieInstant3 -> {
                        return hoodieInstant3.getTimestamp().equals(str2);
                    }).firstInstant().get()).get(), HoodieCommitMetadata.class)).getWriteStats();
                } catch (IOException e) {
                    return Collections.EMPTY_LIST;
                }
            }).forEach(hoodieWriteStat -> {
                Pair of = Pair.of(hoodieWriteStat.getPartitionPath(), hoodieWriteStat.getFileId());
                if (hashSet.contains(of)) {
                    if (nthFromLastInstant.isPresent() && HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.LESSER_THAN, ((HoodieInstant) nthFromLastInstant.get()).getTimestamp())) {
                        hashSet.remove(of);
                    }
                    ((Set) hashMap.computeIfAbsent(of, pair -> {
                        return new HashSet();
                    })).add(hoodieInstant2.getTimestamp());
                }
            });
            if (hashSet.isEmpty()) {
                break;
            }
        }
        for (String str3 : strArr) {
            for (HoodieFileGroup hoodieFileGroup2 : (List) fileSystemView.getAllFileGroups(str3).collect(Collectors.toList())) {
                HashSet hashSet2 = new HashSet();
                hoodieFileGroup2.getAllBaseFiles().forEach(hoodieBaseFile -> {
                    LOG.debug("Data File - " + hoodieBaseFile);
                    hashSet2.add(hoodieBaseFile.getCommitTime());
                });
                if (isAsyncClean) {
                    hashSet2.remove(hoodieInstant.getTimestamp());
                }
                org.junit.jupiter.api.Assertions.assertEquals(hashMap.get(Pair.of(str3, hoodieFileGroup2.getFileGroupId().getFileId())), hashSet2, "Only contain acceptable versions of file should be present");
            }
        }
    }
}
