package org.apache.hudi.table.action.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/table/action/compact/TestAsyncCompaction.class */
public class TestAsyncCompaction extends CompactionTestBase {
    private HoodieWriteConfig getConfig(Boolean bool) {
        return getConfigBuilder(bool).build();
    }

    @Test
    public void testRollbackForInflightCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                runNextDeltaCommits(hoodieWriteClient, getHoodieReadClient(config.getBasePath()), Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
                scheduleCompaction("005", hoodieWriteClient, config);
                HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
                Assertions.assertEquals("005", hoodieInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
                Assertions.assertEquals(HoodieInstant.State.REQUESTED, hoodieInstant.getState(), "Pending Compaction instant has expected state");
                moveCompactionFromRequestedToInflight("005", config);
                this.metaClient.reloadActiveTimeline();
                HoodieSparkTable.create(config, this.context, this.metaClient).rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", "005"));
                this.metaClient.reloadActiveTimeline();
                HoodieInstant hoodieInstant2 = (HoodieInstant) this.metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstantsAsStream().findFirst().get();
                Assertions.assertEquals("compaction", hoodieInstant2.getAction());
                Assertions.assertEquals(HoodieInstant.State.REQUESTED, hoodieInstant2.getState());
                Assertions.assertEquals("005", hoodieInstant2.getTimestamp());
                Assertions.assertTrue(this.metaClient.getFs().getFileStatus(new Path(this.metaClient.getMetaPath(), hoodieInstant2.getFileName())).getLen() > 0);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                List<HoodieRecord> runNextDeltaCommits = runNextDeltaCommits(hoodieWriteClient, getHoodieReadClient(config.getBasePath()), Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
                scheduleCompaction("005", hoodieWriteClient, config);
                createNextDeltaCommit("006", runNextDeltaCommits, hoodieWriteClient, HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build(), config, true);
                HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build();
                Assertions.assertEquals("005", ((HoodieInstant) build.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get()).getTimestamp(), "Pending Compaction instant has expected instant time");
                Assertions.assertEquals("006", ((HoodieInstant) build.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get()).getTimestamp(), "inflight instant has expected instant time");
                hoodieWriteClient.startCommitWithTime("007");
                HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build();
                Assertions.assertEquals(((HoodieInstant) build2.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get()).getTimestamp(), "007", "inflight instant has expected instant time");
                Assertions.assertEquals(1, build2.getActiveTimeline().filterPendingExcludingCompaction().countInstants(), "Expect only one inflight instant");
                Assertions.assertEquals("005", ((HoodieInstant) build2.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get()).getTimestamp(), "Pending Compaction instant has expected instant time");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInflightCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                SparkRDDReadClient hoodieReadClient = getHoodieReadClient(config.getBasePath());
                List<HoodieRecord> runNextDeltaCommits = runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
                HoodieSparkTable hoodieTable = getHoodieTable(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build(), config);
                scheduleCompaction("005", hoodieWriteClient, config);
                moveCompactionFromRequestedToInflight("005", config);
                runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("006", "007"), runNextDeltaCommits, config, false, Arrays.asList("005"));
                executeCompaction("005", hoodieWriteClient, hoodieTable, config, 2000, true);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testConcurrentCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                SparkRDDReadClient hoodieReadClient = getHoodieReadClient(config.getBasePath());
                String str = "002";
                List<HoodieRecord> runNextDeltaCommits = runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Collections.singletonList("001"), this.dataGen.generateInserts("001", 2000), config, true, Collections.emptyList());
                this.metaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build();
                createNextDeltaCommit("002", this.dataGen.generateUpdates("002", Integer.valueOf(runNextDeltaCommits.size())), hoodieWriteClient, this.metaClient, config, true);
                runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("004", "006", "007"), runNextDeltaCommits, config, false, Collections.emptyList());
                HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build();
                HoodieSparkTable hoodieTable = getHoodieTable(build, config);
                scheduleCompaction("005", hoodieWriteClient, config);
                moveCompactionFromRequestedToInflight("005", config);
                Assertions.assertTrue(TimelineMetadataUtils.deserializeCompactionPlan((byte[]) build.reloadActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant("005")).get()).getOperations().stream().noneMatch(hoodieCompactionOperation -> {
                    return hoodieCompactionOperation.getDeltaFilePaths().stream().anyMatch(str2 -> {
                        return str2.contains(str);
                    });
                }), "compaction plan should not include pending log files");
                executeCompaction("005", hoodieWriteClient, hoodieTable, config, 2000, true);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testScheduleIngestionBeforePendingCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        SparkRDDReadClient hoodieReadClient = getHoodieReadClient(config.getBasePath());
        String str = "005";
        String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
        List<HoodieRecord> runNextDeltaCommits = runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
        scheduleCompaction(createNewInstantTime, hoodieWriteClient, config);
        Assertions.assertEquals(createNewInstantTime, ((HoodieInstant) HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build().getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get()).getTimestamp(), "Pending Compaction instant has expected instant time");
        Assertions.assertDoesNotThrow(() -> {
            runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList(str), runNextDeltaCommits, config, false, Arrays.asList(createNewInstantTime));
        }, "Latest pending compaction instant time can be earlier than this instant time");
    }

    @Test
    public void testScheduleCompactionAfterPendingIngestion() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        SparkRDDReadClient hoodieReadClient = getHoodieReadClient(config.getBasePath());
        String str = "006";
        createNextDeltaCommit("005", runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList()), hoodieWriteClient, HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build(), config, true);
        Assertions.assertEquals("005", ((HoodieInstant) HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build().getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get()).getTimestamp(), "inflight instant has expected instant time");
        Assertions.assertDoesNotThrow(() -> {
            scheduleCompaction(str, hoodieWriteClient, config);
        }, "Earliest ingestion inflight instant time can be smaller than the compaction time");
    }

    @Test
    public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
        HoodieWriteConfig config = getConfig(false);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        runNextDeltaCommits(hoodieWriteClient, getHoodieReadClient(config.getBasePath()), Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
        Assertions.assertDoesNotThrow(() -> {
            scheduleCompaction("002", hoodieWriteClient, config);
        }, "Compaction Instant can be scheduled with older timestamp");
        Assertions.assertDoesNotThrow(() -> {
            hoodieWriteClient.scheduleCompactionAtInstant("004", Option.empty());
        }, "Compaction Instant to be scheduled can have same timestamp as committed instant");
        Assertions.assertDoesNotThrow(() -> {
            hoodieWriteClient.scheduleCompactionAtInstant("006", Option.empty());
        }, "Compaction Instant can be scheduled with greater timestamp");
    }

    @Test
    public void testCompactionAfterTwoDeltaCommits() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                runNextDeltaCommits(hoodieWriteClient, getHoodieReadClient(config.getBasePath()), Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
                scheduleAndExecuteCompaction("005", hoodieWriteClient, getHoodieTable(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build(), config), config, 2000, false);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInterleavedCompaction() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                SparkRDDReadClient hoodieReadClient = getHoodieReadClient(config.getBasePath());
                List<HoodieRecord> runNextDeltaCommits = runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
                HoodieSparkTable hoodieTable = getHoodieTable(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build(), config);
                scheduleCompaction("005", hoodieWriteClient, config);
                runNextDeltaCommits(hoodieWriteClient, hoodieReadClient, Arrays.asList("006", "007"), runNextDeltaCommits, config, false, Arrays.asList("005"));
                executeCompaction("005", hoodieWriteClient, hoodieTable, config, 2000, true);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCompactionOnReplacedFiles() throws Exception {
        HoodieWriteConfig config = getConfig(true);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            runNextDeltaCommits(hoodieWriteClient, getHoodieReadClient(config.getBasePath()), Arrays.asList("001", "004"), this.dataGen.generateInserts("001", 2000), config, true, new ArrayList());
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(config.getBasePath()).build();
            HoodieSparkTable hoodieTable = getHoodieTable(build, config);
            scheduleCompaction("005", hoodieWriteClient, config);
            build.reloadActiveTimeline();
            Assertions.assertEquals("005", ((HoodieInstant) build.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get()).getTimestamp(), "Pending Compaction instant has expected instant time");
            Set<HoodieFileGroupId> allFileGroups = getAllFileGroups(hoodieTable, this.dataGen.getPartitionPaths());
            JavaRDD parallelize = this.jsc.parallelize(this.dataGen.generateInserts("006", 2000), 1);
            hoodieWriteClient.startCommitWithTime("006", "replacecommit");
            hoodieWriteClient.insertOverwrite(parallelize, "006");
            build.reloadActiveTimeline();
            HoodieSparkTable hoodieTable2 = getHoodieTable(build, config);
            Assertions.assertEquals(0L, getAllFileGroups(hoodieTable2, this.dataGen.getPartitionPaths()).stream().filter(hoodieFileGroupId -> {
                return allFileGroups.contains(hoodieFileGroupId);
            }).count());
            executeCompactionWithReplacedFiles("005", hoodieWriteClient, hoodieTable2, config, this.dataGen.getPartitionPaths(), allFileGroups);
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable hoodieTable, String[] strArr) {
        return (Set) Arrays.stream(strArr).flatMap(str -> {
            return hoodieTable.getSliceView().getLatestFileSlices(str).map(fileSlice -> {
                return fileSlice.getFileGroupId();
            });
        }).collect(Collectors.toSet());
    }
}
