package org.apache.hudi.io;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieTestCommitGenerator;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.LSMTimelineWriter;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.LSMTimeline;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/io/TestHoodieTimelineArchiver.class */
public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTimelineArchiver.class);
    private Configuration hadoopConf;
    private HoodieWrapperFileSystem wrapperFs;
    private HoodieTableMetadataWriter metadataWriter;
    private HoodieTestTable testTable;

    public void init() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
    }

    public void init(HoodieTableType hoodieTableType) throws Exception {
        initPath();
        initSparkContexts();
        initTimelineService();
        initMetaClient();
        this.hadoopConf = this.context.getHadoopConf().get();
        this.metaClient.getFs().mkdirs(new Path(this.basePath));
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, hoodieTableType);
        this.wrapperFs = this.metaClient.getFs();
        this.hadoopConf.addResource(this.wrapperFs.getConf());
    }

    private void initWriteConfigAndMetatableWriter(HoodieWriteConfig hoodieWriteConfig, boolean z) {
        if (!z) {
            this.testTable = HoodieTestTable.of(this.metaClient);
            return;
        }
        this.metadataWriter = SparkHoodieBackedTableMetadataWriter.create(this.hadoopConf, hoodieWriteConfig, this.context);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        this.testTable = HoodieMetadataTestTable.of(this.metaClient, this.metadataWriter, Option.of(this.context));
    }

    @AfterEach
    public void clean() throws Exception {
        cleanupResources();
        if (this.metadataWriter != null) {
            this.metadataWriter.close();
        }
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3) throws Exception {
        return initTestTableAndGetWriteConfig(z, i, i2, i3, HoodieTableType.COPY_ON_WRITE);
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3, int i4, HoodieTableType hoodieTableType) throws Exception {
        return initTestTableAndGetWriteConfig(z, i, i2, i3, i4, hoodieTableType, 10, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3, HoodieTableType hoodieTableType) throws Exception {
        return initTestTableAndGetWriteConfig(z, i, i2, 5, i3, hoodieTableType, 10, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3, int i4) throws Exception {
        return initTestTableAndGetWriteConfig(z, i, i2, 5, i3, HoodieTableType.COPY_ON_WRITE, i4, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3, int i4, HoodieTableType hoodieTableType, int i5, HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy, WriteConcurrencyMode writeConcurrencyMode) throws Exception {
        return initTestTableAndGetWriteConfig(z, i, i2, i3, i4, hoodieTableType, i5, hoodieFailedWritesCleaningPolicy, writeConcurrencyMode, ((Boolean) HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.defaultValue()).booleanValue());
    }

    private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean z, int i, int i2, int i3, int i4, HoodieTableType hoodieTableType, int i5, HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy, WriteConcurrencyMode writeConcurrencyMode, boolean z2) throws Exception {
        init(hoodieTableType);
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().withTimelineCompactionBatchSize(i5).archiveCommitsWith(i, i2).withArchiveBeyondSavepoint(z2).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(i3).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).withMaxNumDeltaCommitsBeforeCompaction(i4).build()).withWriteConcurrencyMode(writeConcurrencyMode).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).forTable("test-trip-table").build();
        initWriteConfigAndMetatableWriter(build, z);
        return build;
    }

    @Test
    public void testArchiveEmptyTable() throws Exception {
        init();
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).forTable("test-trip-table").build();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        Assertions.assertTrue(new HoodieTimelineArchiver(build, HoodieSparkTable.create(build, this.context, this.metaClient)).archiveIfRequired(this.context));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testArchiveTableWithArchival(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2);
        int i = 1;
        while (i < 10) {
            this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List<HoodieInstant> list2 = (List) archiveAndGetCommitsList.getValue();
            if (i < 6) {
                Assertions.assertEquals(list, list2);
            } else if (i == 6) {
                verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000005", "00000006")), list2);
            } else if (i < 8) {
                Assertions.assertEquals(list, list2);
            } else if (i == 8) {
                verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004")), getActiveCommitInstants(Arrays.asList("00000005", "00000006", "00000007", "00000008")), list2);
            } else {
                Assertions.assertEquals(list, list2);
            }
            i++;
        }
    }

    @Test
    public void testArchiveTableWithReplaceCommits() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2);
        int i = 1;
        while (i < 9) {
            if (i < 3) {
                this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            } else {
                this.testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            }
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List list2 = (List) archiveAndGetCommitsList.getValue();
            if (i == 8) {
                Assertions.assertEquals(6, list.size());
                Assertions.assertEquals(4, list2.size());
            }
            i++;
        }
    }

    @ValueSource(strings = {"KEEP_LATEST_BY_HOURS", "KEEP_LATEST_COMMITS"})
    @ParameterizedTest
    public void testArchivalWithAutoAdjustmentBasedOnCleanConfigs(String str) throws Exception {
        init();
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(5).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).withCleanerPolicy(HoodieCleaningPolicy.valueOf(str)).cleanerNumHoursRetained(1).retainCommits(5).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build()).build();
        HoodieTestTable of = HoodieTestTable.of(this.metaClient);
        ZonedDateTime ofInstant = ZonedDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());
        ArrayList arrayList = new ArrayList();
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 90, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 80, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 70, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 50, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 45, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 40, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 30, false, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 20, false, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 10, true, of, build));
        arrayList.add(triggerCommit("2020/01/01", "2020/01/02", ofInstant, 5, true, of, build));
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(build, true);
        List<HoodieInstant> list = (List) archiveAndGetCommitsList.getKey();
        List<HoodieInstant> list2 = (List) archiveAndGetCommitsList.getValue();
        assertInstantListEquals(arrayList, list);
        assertInstantListEquals(arrayList.subList(2, arrayList.size()), list2);
    }

    private HoodieInstant triggerCommit(final String str, final String str2, ZonedDateTime zonedDateTime, int i, boolean z, HoodieTestTable hoodieTestTable, HoodieWriteConfig hoodieWriteConfig) throws Exception {
        final String uuid = UUID.randomUUID().toString();
        final String uuid2 = UUID.randomUUID().toString();
        String formatDate = HoodieActiveTimeline.formatDate(Date.from(zonedDateTime.minusMinutes(i).toInstant()));
        HoodieTableMetadataWriter create = SparkHoodieBackedTableMetadataWriter.create(this.hadoopConf, hoodieWriteConfig, this.context);
        Throwable th = null;
        try {
            try {
                HoodieInstant commitWithMdt = commitWithMdt(formatDate, Collections.unmodifiableMap(new HashMap<String, List<String>>() { // from class: org.apache.hudi.io.TestHoodieTimelineArchiver.1
                    {
                        put(str, CollectionUtils.createImmutableList(new String[]{uuid}));
                        put(str2, CollectionUtils.createImmutableList(new String[]{uuid2}));
                    }
                }), hoodieTestTable, create, true, true, z);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return commitWithMdt;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private HoodieInstant commitWithMdt(String str, Map<String, List<String>> map, HoodieTestTable hoodieTestTable, HoodieTableMetadataWriter hoodieTableMetadataWriter, boolean z, boolean z2, boolean z3) throws Exception {
        hoodieTestTable.addInflightCommit(str);
        if (z3) {
            HashMap hashMap = new HashMap();
            map.forEach((str2, list) -> {
                try {
                    ArrayList arrayList = new ArrayList();
                    FileCreateUtils.createPartitionMetaFile(this.basePath, str2);
                    if (z) {
                        arrayList.addAll((Collection) hoodieTestTable.withBaseFilesInPartition(str2, (String[]) list.toArray(new String[0])).getValue());
                    }
                    if (z2) {
                        list.forEach(str2 -> {
                            try {
                                arrayList.addAll((Collection) hoodieTestTable.withLogFile(str2, str2, new int[]{1, 2}).getValue());
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        });
                    }
                    hashMap.put(str2, arrayList);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            HoodieCommitMetadata generateCommitMetadata = generateCommitMetadata(str, hashMap);
            hoodieTableMetadataWriter.performTableServices(Option.of(str));
            hoodieTableMetadataWriter.updateFromWriteStatuses(generateCommitMetadata, this.context.emptyHoodieData(), str);
            this.metaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", str), TimelineMetadataUtils.serializeCommitMetadata(generateCommitMetadata));
        } else {
            generateCommitMetadata(str, new HashMap());
        }
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        return new HoodieInstant(z3 ? HoodieInstant.State.COMPLETED : HoodieInstant.State.INFLIGHT, "commit", str);
    }

    protected static HoodieCommitMetadata generateCommitMetadata(String str, Map<String, List<String>> map) {
        HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
        hoodieCommitMetadata.addMetadata("schema", "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", \"name\": \"PhonyRecord\", \"fields\": []}");
        map.forEach((str2, list) -> {
            list.forEach(str2 -> {
                HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
                hoodieWriteStat.setPartitionPath(str2);
                hoodieWriteStat.setPath(str2 + "/" + HoodieTestCommitGenerator.getBaseFilename(str, str2));
                hoodieWriteStat.setFileId(str2);
                hoodieWriteStat.setTotalWriteBytes(1L);
                hoodieWriteStat.setFileSizeInBytes(1L);
                hoodieCommitMetadata.addWriteStat(str2, hoodieWriteStat);
            });
        });
        return hoodieCommitMetadata;
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSavepointWithArchival(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(false, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE, 10, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, z);
        int i = 1;
        while (i < 5) {
            this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i)), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            i++;
        }
        String format = String.format("%08d", 3);
        this.testTable.addSavepoint(format, this.testTable.doSavepoint(format));
        for (int i2 = 5; i2 < 7; i2++) {
            this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i2)), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        }
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List<HoodieInstant> list = (List) archiveAndGetCommitsList.getValue();
        if (z) {
            verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000004", "00000005")), (List) Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000006")).stream(), getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()).collect(Collectors.toList()), list);
        } else {
            verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), (List) Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000005", "00000006")).stream(), getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()).collect(Collectors.toList()), list);
        }
        for (int i3 = 7; i3 < 10; i3++) {
            this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i3)), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        }
        this.testTable.deleteSavepoint(format);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList2 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List<HoodieInstant> list2 = (List) archiveAndGetCommitsList2.getValue();
        this.metaClient.reloadActiveTimeline();
        verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006", "00000007")), getActiveCommitInstants(Arrays.asList("00000008", "00000009")), list2);
    }

    @Test
    public void testCompactionWithCorruptVersionFile() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
        int i = 1;
        while (i < 10) {
            this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            i++;
        }
        this.metaClient.getFs().delete(LSMTimeline.getVersionFilePath(this.metaClient));
        FileIOUtils.createFileInPath(this.metaClient.getFs(), LSMTimeline.getVersionFilePath(this.metaClient), Option.of(StringUtils.getUTF8Bytes("invalid_version")));
        Assertions.assertEquals(19, new HoodieActiveTimeline(this.metaClient, false).countInstants() + this.metaClient.getArchivedTimeline().reload().countInstants());
        int i2 = 1;
        while (i2 < 10) {
            this.testTable.doWriteOperation("1000000" + i2, WriteOperationType.UPSERT, i2 == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            i2++;
        }
        Assertions.assertEquals(26, this.metaClient.getArchivedTimeline().reload().countInstants() + new HoodieActiveTimeline(this.metaClient, false).countInstants());
        FileIOUtils.createFileInPath(this.metaClient.getFs(), new Path(this.metaClient.getArchivePath(), "300_301_1.parquet"), Option.of(StringUtils.getUTF8Bytes("dummy")));
        Assertions.assertDoesNotThrow(() -> {
            return this.metaClient.getArchivedTimeline().reload();
        }, "Archived timeline can skip the invalid data and manifest files smartly");
    }

    @Test
    public void testCompactionRecoverWithoutManifestFile() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
        int i = 1;
        while (i < 10) {
            this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            i++;
        }
        LSMTimelineWriter lSMTimelineWriter = LSMTimelineWriter.getInstance(initTestTableAndGetWriteConfig, HoodieSparkTable.create(initTestTableAndGetWriteConfig, this.context, this.metaClient));
        List list = (List) LSMTimeline.latestSnapshotManifest(this.metaClient).getFiles().stream().sorted().map((v0) -> {
            return v0.getFileName();
        }).collect(Collectors.toList());
        lSMTimelineWriter.compactFiles(list, LSMTimelineWriter.compactedFileName(list));
        Assertions.assertEquals(19, new HoodieActiveTimeline(this.metaClient, false).countInstants() + this.metaClient.getArchivedTimeline().reload().reload().countInstants());
    }

    @Test
    public void testCompactionCleaning() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
        int i = 1;
        while (i < 19) {
            this.testTable.doWriteOperation(this.metaClient.createNewInstantTime(), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            i++;
        }
        Assertions.assertEquals(26, new HoodieActiveTimeline(this.metaClient, false).countInstants() + this.metaClient.getArchivedTimeline().countInstants());
        Assertions.assertEquals(9, LSMTimeline.latestSnapshotVersion(this.metaClient));
        Assertions.assertEquals(Arrays.asList(7, 8, 9), LSMTimeline.allSnapshotVersions(this.metaClient).stream().sorted().collect(Collectors.toList()));
    }

    @Test
    public void testReadArchivedCompactionPlan() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 5, HoodieTableType.MERGE_ON_READ);
        int i = 1;
        while (i < 11) {
            this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i)), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            this.testTable.doCompaction(String.format("%08d", Integer.valueOf(i + 1)), Arrays.asList("p1", "p2"));
            archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            i += 2;
        }
        HoodieArchivedTimeline archivedTimeline = this.metaClient.getArchivedTimeline();
        archivedTimeline.loadCompactionDetailsInMemory("00000001", "00000011");
        List instants = archivedTimeline.getCommitTimeline().getInstants();
        Assertions.assertEquals(2, instants.size(), "Two compactions instants should be archived.");
        Stream stream = instants.stream();
        archivedTimeline.getClass();
        List<Option> list = (List) stream.map(archivedTimeline::getInstantDetails).collect(Collectors.toList());
        Assertions.assertTrue(list.stream().allMatch((v0) -> {
            return v0.isPresent();
        }), "All the compaction instants should have plan details.");
        for (Option option : list) {
            Assertions.assertDoesNotThrow(() -> {
                return TimelineMetadataUtils.deserializeCompactionPlan((byte[]) option.get());
            });
        }
    }

    @Disabled("HUDI-6841")
    public void testArchivalWithMultiWritersMDTDisabled() throws Exception {
        testArchivalWithMultiWriters(false);
    }

    @Disabled("HUDI-6386")
    public void testArchivalWithMultiWriters() throws Exception {
        testArchivalWithMultiWriters(true);
    }

    private void testArchivalWithMultiWriters(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 5, 2, HoodieTableType.COPY_ON_WRITE, 10, HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        IntStream.range(0, 2).forEach(i -> {
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                HoodieSparkTable create = HoodieSparkTable.create(initTestTableAndGetWriteConfig, this.context, this.metaClient);
                try {
                    countDownLatch.await(30L, TimeUnit.SECONDS);
                    this.metaClient.reloadActiveTimeline();
                    while (true) {
                        if (((HoodieInstant) this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get()).getTimestamp().endsWith("29") && this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() <= 5) {
                            return true;
                        }
                        try {
                            new HoodieTimelineArchiver(initTestTableAndGetWriteConfig, create).archiveIfRequired(this.context, true);
                            if (z) {
                                Thread.sleep(2L);
                            }
                            create.getMetaClient().reloadActiveTimeline();
                        } catch (IOException e) {
                            throw new HoodieException("IOException thrown while archiving ", e);
                        } catch (InterruptedException e2) {
                            throw new HoodieException("Should not have thrown InterruptedException ", e2);
                        }
                    }
                } catch (InterruptedException e3) {
                    throw new HoodieException("Should not have thrown InterruptedException ", e3);
                }
            }, newFixedThreadPool));
        });
        int i2 = 1;
        while (i2 < 30) {
            this.testTable.doWriteOperation("0000000" + String.format("%02d", Integer.valueOf(i2)), WriteOperationType.UPSERT, i2 == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            if (i2 == 6) {
                countDownLatch.countDown();
            }
            i2++;
        }
        try {
            allOfTerminateOnFailure(arrayList).get();
            newFixedThreadPool.shutdownNow();
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }

    private static CompletableFuture allOfTerminateOnFailure(List<CompletableFuture<Boolean>> list) {
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (int i = 0; i < list.size(); i++) {
            int i2 = i;
            list.get(i).exceptionally(th -> {
                if (atomicBoolean.getAndSet(true)) {
                    return null;
                }
                LOG.warn("One of the job failed. Cancelling all other futures. " + th.getCause() + ", " + th.getMessage());
                for (int i3 = 0; i3 < list.size(); i3++) {
                    if (i3 != i2) {
                        ((CompletableFuture) list.get(i3)).cancel(true);
                    }
                }
                return null;
            });
        }
        return CompletableFuture.anyOf(completableFuture, CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2);
        for (int i = 1; i < 6; i++) {
            this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            Assertions.assertEquals((List) archiveAndGetCommitsList.getKey(), (List) archiveAndGetCommitsList.getValue());
        }
        this.testTable.doWriteOperation("00000006", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2, false, true);
        this.testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2, false, true);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList2 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        Assertions.assertEquals((List) archiveAndGetCommitsList2.getKey(), (List) archiveAndGetCommitsList2.getValue());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static Stream<Arguments> archiveCommitSavepointNoHoleParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true, true}, new Boolean[]{false, true}, new Boolean[]{true, false}, new Boolean[]{false, false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    @MethodSource({"archiveCommitSavepointNoHoleParams"})
    @ParameterizedTest
    public void testArchiveCommitSavepointNoHole(boolean z, boolean z2) throws Exception {
        init();
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).forTable("test-trip-table").withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).withArchiveBeyondSavepoint(z2).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).build()).build();
        HoodieTestDataGenerator.createCommitFile(this.basePath, "100", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "101", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createSavepointFile(this.basePath, "101", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "102", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "103", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "104", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "105", this.wrapperFs.getConf());
        HoodieTimelineArchiver hoodieTimelineArchiver = new HoodieTimelineArchiver(build, HoodieSparkTable.create(build, this.context));
        if (z) {
            HoodieTestUtils.createCompactionCommitInMetadataTable(this.hadoopConf, this.wrapperFs, this.basePath, "105");
        }
        Assertions.assertEquals(6, this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(), "Loaded 6 commits and the count should match");
        Assertions.assertTrue(hoodieTimelineArchiver.archiveIfRequired(this.context));
        HoodieTimeline filterCompletedInstants = this.metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
        if (!z2) {
            Assertions.assertEquals(5, filterCompletedInstants.countInstants(), "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
            Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "101")), "Archived commits should always be safe");
            Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "102")), "Archived commits should always be safe");
            Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "103")), "Archived commits should always be safe");
            return;
        }
        Assertions.assertEquals(2, filterCompletedInstants.countInstants(), "Since archiveBeyondSavepoint config is enabled, we will archive commits 102, 103 ");
        Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "101")), "Savepointed commits should always be safe");
        Assertions.assertFalse(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "102")), "102 expected to be archived");
        Assertions.assertFalse(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "103")), "103 expected to be archived");
        Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "105")), "104 expected to be archived");
        Assertions.assertTrue(filterCompletedInstants.containsInstant(new HoodieInstant(false, "commit", "105")), "105 expected to be in active timeline");
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPendingClusteringWillBlockArchival(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2);
        HoodieTestDataGenerator.createPendingReplaceFile(this.basePath, "00000000", this.wrapperFs.getConf());
        for (int i = 1; i < 8; i++) {
            this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            Assertions.assertEquals((List) archiveAndGetCommitsList.getKey(), (List) archiveAndGetCommitsList.getValue());
        }
        Assertions.assertEquals(7, this.metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants().countInstants(), "Since we have a pending clustering instant at 00000000, we should never archive any commit after 00000000");
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testArchiveRollbacksTestTable(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2);
        int i = 1;
        while (i < 13) {
            this.testTable.doWriteOperation("000000" + String.format("%02d", Integer.valueOf(i)), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            this.testTable.doRollback("000000" + String.format("%02d", Integer.valueOf(i)), "000000" + String.format("%02d", Integer.valueOf(i + 1)));
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List<HoodieInstant> list2 = (List) archiveAndGetCommitsList.getValue();
            if (i != 11) {
                Assertions.assertEquals(list, list2);
            } else {
                ArrayList arrayList = new ArrayList();
                arrayList.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003")));
                arrayList.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000002"), "rollback"));
                ArrayList arrayList2 = new ArrayList();
                arrayList2.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000007", "00000009", "00000011")));
                arrayList2.addAll(getActiveCommitInstants(Arrays.asList("00000004", "00000006", "00000008", "00000010", "00000012"), "rollback"));
                verifyArchival(arrayList, arrayList2, list2);
            }
            i += 2;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testNoArchivalWithInflightCompactionInMiddle(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2, 2, HoodieTableType.MERGE_ON_READ);
        HoodieCommitMetadata hoodieCommitMetadata = null;
        int i = 1;
        while (i < 8) {
            if (i == 2) {
                hoodieCommitMetadata = this.testTable.doCompaction("0000000" + i, Arrays.asList("p1", "p2"), true);
            } else {
                this.testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            }
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List list2 = (List) archiveAndGetCommitsList.getValue();
            if (z) {
                if (i != 7) {
                    Assertions.assertEquals(list, list2);
                } else {
                    Assertions.assertEquals(list.size() - list2.size(), 0);
                }
            } else if (i != 7) {
                Assertions.assertEquals(list, list2);
            } else {
                Assertions.assertEquals(list.size() - list2.size(), 0);
                for (int i2 = 1; i2 <= 7; i2++) {
                    if (i2 == 1) {
                        Assertions.assertTrue(list2.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", "0000000" + i2)));
                    } else if (i2 == 2) {
                        Assertions.assertFalse(list2.contains(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", "0000000" + i2)));
                    } else {
                        Assertions.assertTrue(list2.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", "0000000" + i2)));
                    }
                }
            }
            i++;
        }
        this.testTable.moveInflightCompactionToComplete("00000002", hoodieCommitMetadata);
        this.testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 2);
        List<HoodieInstant> list3 = (List) archiveAndGetCommitsList(initTestTableAndGetWriteConfig).getValue();
        List<HoodieInstant> allArchivedCommitInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004"), "deltacommit");
        allArchivedCommitInstants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "00000002"));
        verifyArchival(allArchivedCommitInstants, getActiveCommitInstants(Arrays.asList("00000005", "00000006", "00000007", "00000008"), "deltacommit"), list3);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testArchiveCommitTimeline(boolean z) throws Exception {
        init();
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).forTable("test-trip-table").withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).build()).build();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieTestDataGenerator.createCommitFile(this.basePath, "1", this.wrapperFs.getConf());
        HoodieInstant hoodieInstant = new HoodieInstant(false, "commit", "1");
        HoodieTestDataGenerator.createCommitFile(this.basePath, "2", this.wrapperFs.getConf());
        Path path = new Path(this.metaClient.getMarkerFolderPath("2"));
        this.wrapperFs.mkdirs(path);
        HoodieInstant hoodieInstant2 = new HoodieInstant(false, "commit", "2");
        HoodieTestDataGenerator.createCommitFile(this.basePath, "3", this.wrapperFs.getConf());
        HoodieInstant hoodieInstant3 = new HoodieInstant(false, "commit", "3");
        HoodieTestDataGenerator.createCommitFile(this.basePath, "4", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createCommitFile(this.basePath, "5", this.wrapperFs.getConf());
        if (z) {
            HoodieTestUtils.createCompactionCommitInMetadataTable(this.hadoopConf, this.wrapperFs, this.basePath, "5");
        }
        Assertions.assertTrue(new HoodieTimelineArchiver(build, HoodieSparkTable.create(build, this.context, this.metaClient)).archiveIfRequired(this.context));
        Assertions.assertEquals(new HashSet(Arrays.asList(hoodieInstant, hoodieInstant2, hoodieInstant3)), this.metaClient.getArchivedTimeline().filterCompletedInstants().getInstantsAsStream().collect(Collectors.toSet()));
        Assertions.assertFalse(this.wrapperFs.exists(path));
    }

    private void verifyInflightInstants(HoodieTableMetaClient hoodieTableMetaClient, int i) {
        Assertions.assertEquals(i, hoodieTableMetaClient.getActiveTimeline().reload().getTimelineOfActions(Collections.singleton("clean")).filterInflights().countInstants(), "Loaded inflight clean actions and the count should match");
    }

    @Test
    public void testConvertCommitMetadata() throws Exception {
        init();
        HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
        hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        Assertions.assertEquals(MetadataConversionUtils.convertCommitMetadata(hoodieCommitMetadata).getOperationType(), WriteOperationType.INSERT.toString());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testArchiveTableWithCleanCommits(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 2, 4, 8);
        HashMap hashMap = new HashMap();
        hashMap.put("p1", 1);
        hashMap.put("p2", 2);
        for (int i = 1; i <= 8; i++) {
            if (i == 1) {
                this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i)), WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20);
            } else if (i <= 3) {
                this.testTable.doClean(String.format("%08d", Integer.valueOf(i)), hashMap);
            } else {
                this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i)), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            }
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List<HoodieInstant> list2 = (List) archiveAndGetCommitsList.getValue();
            if (i < 7) {
                Assertions.assertEquals(list, list2);
            } else if (i == 7) {
                if (z) {
                    Assertions.assertEquals(list, list2);
                } else {
                    ArrayList arrayList = new ArrayList(getActiveCommitInstants(Arrays.asList("00000006", "00000007")));
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000004", "00000005")));
                    arrayList2.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003"), "clean"));
                    verifyArchival(arrayList2, arrayList, list2);
                }
            } else if (z) {
                ArrayList arrayList3 = new ArrayList(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
                ArrayList arrayList4 = new ArrayList();
                arrayList4.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000004", "00000005", "00000006")));
                arrayList4.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003"), "clean"));
                verifyArchival(arrayList4, arrayList3, list2);
            } else {
                Assertions.assertEquals(list, list2);
            }
        }
    }

    @Test
    public void testArchiveRollbacksAndCleanTestTable() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2);
        this.testTable.doWriteOperation(String.format("%08d", 1), WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20);
        HashMap hashMap = new HashMap();
        hashMap.put("p1", 1);
        hashMap.put("p2", 1);
        for (int i = 2; i < 5; i++) {
            this.testTable.doClean(String.format("%08d", Integer.valueOf(i)), hashMap);
        }
        for (int i2 = 5; i2 <= 11; i2 += 2) {
            this.testTable.doWriteOperation(String.format("%08d", Integer.valueOf(i2)), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            this.testTable.doRollback(String.format("%08d", Integer.valueOf(i2)), String.format("%08d", Integer.valueOf(i2 + 1)));
        }
        List<HoodieInstant> list = (List) archiveAndGetCommitsList(initTestTableAndGetWriteConfig).getValue();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000010", "00000012"), "rollback"));
        arrayList.addAll(getActiveCommitInstants(Arrays.asList("00000009", "00000011")));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000005", "00000007")));
        arrayList2.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), "clean"));
        arrayList2.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000006"), "rollback"));
        verifyArchival(arrayList2, arrayList, list);
    }

    @ParameterizedTest
    @CsvSource({"true,true", "true,false", "false,true", "false,false"})
    public void testArchiveCompletedRollbackAndClean(boolean z, boolean z2) throws Exception {
        init();
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).forTable("test-trip-table").withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 10).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z2).build()).build();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        int i = 1;
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (i2 < 10 + 1) {
            createCleanMetadata(String.format("%02d", Integer.valueOf(i)), false, false, z || i2 % 2 == 0);
            arrayList.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "clean", String.format("%02d", Integer.valueOf(i))));
            i2++;
            i++;
        }
        int i3 = 0;
        while (i3 < 10 + 1) {
            createCommitAndRollbackFile((i + 1) + "", i + "", false, z || i3 % 2 == 0);
            arrayList.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "rollback", String.format("%02d", Integer.valueOf(i))));
            i3++;
            i += 2;
        }
        if (z2) {
            HoodieTestUtils.createCompactionCommitInMetadataTable(this.hadoopConf, this.wrapperFs, this.basePath, Integer.toString(99));
        }
        new HoodieTimelineArchiver(build, HoodieSparkTable.create(build, this.context, this.metaClient)).archiveIfRequired(this.context);
        Map map = (Map) this.metaClient.getActiveTimeline().reload().getInstantsAsStream().collect(Collectors.groupingBy((v0) -> {
            return v0.getAction();
        }));
        Assertions.assertTrue(map.containsKey("commit"), "Commit Action key must be preset");
        Assertions.assertEquals(2, ((List) map.get("commit")).size(), "Should have min instant");
        Assertions.assertTrue(map.containsKey("rollback"), "Rollback Action key must be preset");
        Assertions.assertEquals(2, ((List) map.get("rollback")).size(), "Should have min instant");
        this.metaClient.getArchivedTimeline().loadCompletedInstantDetailsInMemory();
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.reloadActiveTimeline().firstInstant().get();
        ((List) arrayList.stream().filter(hoodieInstant2 -> {
            return HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.LESSER_THAN, hoodieInstant.getTimestamp());
        }).collect(Collectors.toList())).forEach(hoodieInstant3 -> {
            Assertions.assertTrue(this.metaClient.getArchivedTimeline().containsInstant(hoodieInstant3));
        });
    }

    @Test
    public void testArchiveTableWithMetadataTableCompaction() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(true, 4, 5, 7);
        ArrayList arrayList = new ArrayList();
        int i = 1;
        while (i < 7) {
            String createNewInstantTime = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime);
            this.testTable.doWriteOperation(createNewInstantTime, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            Assertions.assertEquals((List) archiveAndGetCommitsList.getKey(), (List) archiveAndGetCommitsList.getValue());
            i++;
        }
        String createNewInstantTime2 = this.metaClient.createNewInstantTime();
        arrayList.add(createNewInstantTime2);
        this.testTable.doWriteOperation(createNewInstantTime2, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList2 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List list = (List) archiveAndGetCommitsList2.getKey();
        List<HoodieInstant> list2 = (List) archiveAndGetCommitsList2.getValue();
        Assertions.assertEquals(list.size() - list2.size(), 3);
        verifyArchival(getAllArchivedCommitInstants(arrayList.subList(0, 3)), getActiveCommitInstants(arrayList.subList(3, 7)), list2);
        for (int i2 = 0; i2 < 3; i2++) {
            String createNewInstantTime3 = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime3);
            this.testTable.doWriteOperation(createNewInstantTime3, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        }
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList3 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List list3 = (List) archiveAndGetCommitsList3.getKey();
        List<HoodieInstant> list4 = (List) archiveAndGetCommitsList3.getValue();
        Assertions.assertEquals(list3.size() - list4.size(), 3);
        verifyArchival(getAllArchivedCommitInstants(arrayList.subList(0, 6)), getActiveCommitInstants(arrayList.subList(6, 10)), list4);
        for (int i3 = 0; i3 < 2; i3++) {
            String createNewInstantTime4 = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime4);
            this.testTable.doWriteOperation(createNewInstantTime4, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        }
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList4 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List list5 = (List) archiveAndGetCommitsList4.getKey();
        List<HoodieInstant> list6 = (List) archiveAndGetCommitsList4.getValue();
        Assertions.assertEquals(list5, list6);
        verifyArchival(getAllArchivedCommitInstants(arrayList.subList(0, 6)), getActiveCommitInstants(arrayList.subList(6, 12)), list6);
        String createNewInstantTime5 = this.metaClient.createNewInstantTime();
        arrayList.add(createNewInstantTime5);
        this.testTable.doWriteOperation(createNewInstantTime5, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList5 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        Assertions.assertEquals((List) archiveAndGetCommitsList5.getKey(), (List) archiveAndGetCommitsList5.getValue());
        String createNewInstantTime6 = this.metaClient.createNewInstantTime();
        arrayList.add(createNewInstantTime6);
        this.testTable.doWriteOperation(createNewInstantTime6, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
        Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList6 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
        List list7 = (List) archiveAndGetCommitsList6.getKey();
        List<HoodieInstant> list8 = (List) archiveAndGetCommitsList6.getValue();
        Assertions.assertEquals(list7.size() - list8.size(), 4);
        verifyArchival(getAllArchivedCommitInstants(arrayList.subList(0, 10)), getActiveCommitInstants(arrayList.subList(10, 14)), list8);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 8, 1, HoodieTableType.MERGE_ON_READ);
        ArrayList arrayList = new ArrayList();
        int i = 1;
        while (i < 10) {
            String createNewInstantTime = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime);
            this.testTable.doWriteOperation(createNewInstantTime, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list = (List) archiveAndGetCommitsList.getKey();
            List list2 = (List) archiveAndGetCommitsList.getValue();
            if (i <= 8) {
                Assertions.assertEquals(list, list2);
            } else {
                Assertions.assertEquals(1, list.size() - list2.size());
                Assertions.assertFalse(list2.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(0))));
                IntStream.range(2, 10).forEach(i2 -> {
                    Assertions.assertTrue(list2.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i2 - 1))));
                });
            }
            i++;
        }
        String createNewInstantTime2 = this.metaClient.createNewInstantTime();
        arrayList.add(createNewInstantTime2);
        this.testTable.doCompaction(createNewInstantTime2, Arrays.asList("p1", "p2"));
        int i3 = 1;
        while (i3 < 10) {
            String createNewInstantTime3 = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime3);
            this.testTable.doWriteOperation(createNewInstantTime3, WriteOperationType.UPSERT, i3 == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList2 = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            List list3 = (List) archiveAndGetCommitsList2.getKey();
            List list4 = (List) archiveAndGetCommitsList2.getValue();
            if (i3 <= 2) {
                IntStream.range(1, 8).forEach(i4 -> {
                    Assertions.assertFalse(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i4 - 1))));
                });
                Assertions.assertEquals(i3 == 1 ? 6 : 0, list3.size() - list4.size());
                Assertions.assertTrue(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(7))));
                Assertions.assertTrue(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(8))));
                Assertions.assertTrue(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", createNewInstantTime2)));
                for (int i5 = 1; i5 <= i3; i5++) {
                    Assertions.assertTrue(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(9 + i5))));
                }
            } else {
                IntStream.range(1, 10).forEach(i6 -> {
                    Assertions.assertFalse(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i6 - 1))));
                });
                if (i3 == 3) {
                    Assertions.assertEquals(2, list3.size() - list4.size());
                } else if (i3 < 8) {
                    Assertions.assertEquals(list3, list4);
                } else {
                    Assertions.assertEquals(1, list3.size() - list4.size());
                    Assertions.assertFalse(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", createNewInstantTime2)));
                    if (i3 == 9) {
                        Assertions.assertFalse(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(10))));
                    }
                    IntStream.range(i3 - 7, i3 + 1).forEach(i7 -> {
                        Assertions.assertTrue(list4.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(9 + i7))));
                    });
                }
            }
            i3++;
        }
    }

    @Test
    public void testGetCommitInstantsToArchiveDuringInflightCommits() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(false, 3, 4, 2);
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 3; i++) {
            String str = "100" + i;
            HoodieTestDataGenerator.createCommitFile(this.basePath, str, this.wrapperFs.getConf());
            hashSet.add(str);
        }
        HoodieTestDataGenerator.createReplaceCommitRequestedFile(this.basePath, "1003", this.wrapperFs.getConf());
        hashSet.add("1003");
        for (int i2 = 4; i2 < 7; i2++) {
            String str2 = "100" + i2;
            HoodieTestDataGenerator.createCommitFile(this.basePath, str2, this.wrapperFs.getConf());
            hashSet.add(str2);
        }
        HoodieTestDataGenerator.createRequestedCommitFile(this.basePath, "1007", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createPendingCommitFile(this.basePath, "1007", this.wrapperFs.getConf());
        hashSet.add("1007");
        for (int i3 = 0; i3 < 6; i3++) {
            String str3 = "101" + i3;
            HoodieTestDataGenerator.createCommitFile(this.basePath, str3, this.wrapperFs.getConf());
            hashSet.add(str3);
        }
        Assertions.assertEquals(hashSet.size(), this.metaClient.reloadActiveTimeline().getWriteTimeline().countInstants(), "Loaded 14 commits and the count should match");
        HoodieTimelineArchiver hoodieTimelineArchiver = new HoodieTimelineArchiver(initTestTableAndGetWriteConfig, HoodieSparkTable.create(initTestTableAndGetWriteConfig, this.context, this.metaClient));
        boolean archiveIfRequired = hoodieTimelineArchiver.archiveIfRequired(this.context);
        hashSet.remove("1000");
        hashSet.remove("1001");
        Assertions.assertTrue(archiveIfRequired);
        HoodieDefaultTimeline writeTimeline = this.metaClient.reloadActiveTimeline().getWriteTimeline();
        Assertions.assertEquals(12, writeTimeline.countInstants(), "After archival only first 2 commits should be archived");
        Assertions.assertEquals(hashSet.size(), writeTimeline.countInstants(), "After archival only first 2 commits should be archived");
        Assertions.assertEquals(12L, hashSet.stream().filter(str4 -> {
            return writeTimeline.containsInstant(str4);
        }).count());
        Assertions.assertEquals("1002", ((HoodieInstant) writeTimeline.getInstantsAsStream().findFirst().get()).getTimestamp());
        this.metaClient.getFs().delete(new Path(this.basePath + "/.hoodie/" + HoodieTimeline.makeRequestedReplaceFileName("1003")));
        this.metaClient.reloadActiveTimeline();
        Assertions.assertTrue(hoodieTimelineArchiver.archiveIfRequired(this.context));
        HoodieDefaultTimeline writeTimeline2 = this.metaClient.reloadActiveTimeline().getWriteTimeline();
        hashSet.removeAll(Arrays.asList("1002", "1003", "1004", "1005"));
        Assertions.assertEquals(8, writeTimeline2.countInstants(), "After archival only first 2 commits should be archived");
        Assertions.assertEquals(hashSet.size(), writeTimeline2.countInstants(), "After archival only first 2 commits should be archived");
        Assertions.assertEquals(8L, hashSet.stream().filter(str5 -> {
            return writeTimeline2.containsInstant(str5);
        }).count());
        Assertions.assertEquals("1006", ((HoodieInstant) writeTimeline2.getInstantsAsStream().findFirst().get()).getTimestamp());
    }

    @Test
    public void testWithOldestReplaceCommit() throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2);
        HoodieTestDataGenerator.createReplaceCommitRequestedFile(this.basePath, "1001", this.wrapperFs.getConf());
        HoodieTestDataGenerator.createReplaceCommitInflightFile(this.basePath, "1001", this.wrapperFs.getConf());
        for (int i = 2; i < 10; i++) {
            HoodieTestDataGenerator.createCommitFile(this.basePath, "100" + i, this.wrapperFs.getConf());
        }
        HoodieTimelineArchiver hoodieTimelineArchiver = new HoodieTimelineArchiver(initTestTableAndGetWriteConfig, HoodieSparkTable.create(initTestTableAndGetWriteConfig, this.context, this.metaClient));
        Assertions.assertEquals(9, this.metaClient.reloadActiveTimeline().countInstants(), "Loaded 9 commits and the count should match");
        Assertions.assertTrue(hoodieTimelineArchiver.archiveIfRequired(this.context));
        HoodieActiveTimeline reloadActiveTimeline = this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals(9, reloadActiveTimeline.countInstants(), "Since we have a pending replacecommit at 1001, we should never archive any commit after 1001");
        Assertions.assertEquals("1001", ((HoodieInstant) reloadActiveTimeline.getInstantsAsStream().findFirst().get()).getTimestamp());
    }

    @Test
    public void testArchivalAndCompactionInMetadataTable() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 6).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(8).build()).forTable("test-trip-table").build();
        initWriteConfigAndMetatableWriter(build, true);
        HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.metaClient.getHadoopConf()).setBasePath(HoodieTableMetadata.getMetadataTableBasePath(this.basePath)).setLoadActiveTimelineOnLoad(true).build();
        ArrayList arrayList = new ArrayList();
        int i = 1;
        while (i <= 18) {
            String createNewInstantTime = this.metaClient.createNewInstantTime();
            arrayList.add(createNewInstantTime);
            if (i != 2) {
                this.testTable.doWriteOperation(createNewInstantTime, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
            } else {
                this.testTable.doRollback((String) arrayList.get(0), createNewInstantTime);
            }
            archiveAndGetCommitsList(build);
            build2 = HoodieTableMetaClient.reload(build2);
            List instants = build2.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
            if (i == 1) {
                Assertions.assertEquals(i + 1, instants.size());
                Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", "00000000000000010")));
                Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(0))));
            } else if (i <= 8) {
                Assertions.assertEquals(i, instants.size());
                Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", "00000000000000010")));
                IntStream.range(2, i).forEach(i2 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i2 - 1))));
                });
            } else if (i == 9) {
                Assertions.assertEquals(9, instants.size());
                IntStream.range(2, i).forEach(i3 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i3 - 1))));
                });
            } else if (i <= 12) {
                Assertions.assertEquals(i - 4, instants.size());
                Assertions.assertEquals(1, build2.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants());
                IntStream.range(6, i).forEach(i4 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i4 - 1))));
                });
            } else if (i <= 16) {
                Assertions.assertEquals(i - 7, instants.size());
                Assertions.assertEquals(1, build2.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants());
                IntStream.range(9, i).forEach(i5 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i5 - 1))));
                });
            } else if (i == 17) {
                Assertions.assertEquals(11, instants.size());
                Assertions.assertEquals(2, build2.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants());
                IntStream.range(9, i).forEach(i6 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i6 - 1))));
                });
            } else {
                Assertions.assertEquals(6, instants.size());
                Assertions.assertTrue(metadata(build, this.context).getLatestCompactionTime().isPresent());
                IntStream.range(14, i).forEach(i7 -> {
                    Assertions.assertTrue(instants.contains(new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", (String) arrayList.get(i7 - 1))));
                });
            }
            i++;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPendingClusteringAfterArchiveCommit(boolean z) throws Exception {
        HoodieWriteConfig initTestTableAndGetWriteConfig = initTestTableAndGetWriteConfig(z, 4, 5, 2);
        HoodieTestDataGenerator.createPendingReplaceFile(this.basePath, "00000002", this.wrapperFs.getConf());
        for (int i = 1; i < 8; i++) {
            if (i != 2) {
                this.testTable.doWriteOperation("0000000" + i, WriteOperationType.CLUSTER, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2);
            }
            Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList = archiveAndGetCommitsList(initTestTableAndGetWriteConfig);
            Assertions.assertEquals((List) archiveAndGetCommitsList.getKey(), (List) archiveAndGetCommitsList.getValue());
        }
        Assertions.assertEquals(6, this.metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants().countInstants(), "Since we have a pending clustering instant at 00000002, we should never archive any commit after 00000000");
    }

    private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig hoodieWriteConfig) throws IOException {
        return archiveAndGetCommitsList(hoodieWriteConfig, false);
    }

    private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig hoodieWriteConfig, boolean z) throws IOException {
        this.metaClient.reloadActiveTimeline();
        List instants = (z ? this.metaClient.getActiveTimeline().reload().getAllCommitsTimeline() : this.metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants()).getInstants();
        new HoodieTimelineArchiver(hoodieWriteConfig, HoodieSparkTable.create(hoodieWriteConfig, this.context, this.metaClient)).archiveIfRequired(this.context);
        return Pair.of(instants, (z ? this.metaClient.getActiveTimeline().reload().getAllCommitsTimeline() : this.metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants()).getInstants());
    }

    private void verifyArchival(List<HoodieInstant> list, List<HoodieInstant> list2, List<HoodieInstant> list3) {
        list2.sort(Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
        list3.sort(Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
        Assertions.assertEquals(list2, list3);
        list.forEach(hoodieInstant -> {
            Assertions.assertFalse(list3.contains(hoodieInstant));
        });
        List instants = new HoodieArchivedTimeline(this.metaClient).getInstants();
        instants.sort(Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
        list.sort(Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
        Assertions.assertEquals(instants, list);
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        list.forEach(hoodieInstant2 -> {
            if (hoodieInstant2.getAction().equals("rollback")) {
                return;
            }
            Assertions.assertTrue(activeTimeline.containsOrBeforeTimelineStarts(hoodieInstant2.getTimestamp()), "Archived commits should always be safe");
        });
    }

    private List<HoodieInstant> getAllArchivedCommitInstants(List<String> list) {
        return getAllArchivedCommitInstants(list, "commit");
    }

    private List<HoodieInstant> getAllArchivedCommitInstants(List<String> list, String str) {
        ArrayList arrayList = new ArrayList();
        list.forEach(str2 -> {
            arrayList.add(new HoodieInstant(HoodieInstant.State.COMPLETED, str, str2));
        });
        return arrayList;
    }

    private List<HoodieInstant> getActiveCommitInstants(List<String> list) {
        return getActiveCommitInstants(list, "commit");
    }

    private List<HoodieInstant> getActiveSavepointedCommitInstants(List<String> list) {
        return getActiveCommitInstants(list, "savepoint");
    }

    private List<HoodieInstant> getActiveCommitInstants(List<String> list, String str) {
        ArrayList arrayList = new ArrayList();
        list.forEach(str2 -> {
            arrayList.add(new HoodieInstant(HoodieInstant.State.COMPLETED, str, str2));
        });
        return arrayList;
    }

    private void createCommitAndRollbackFile(String str, String str2, boolean z) throws IOException {
        createCommitAndRollbackFile(str, str2, z, false);
    }

    private void createCommitAndRollbackFile(String str, String str2, boolean z, boolean z2) throws IOException {
        HoodieTestDataGenerator.createCommitFile(this.basePath, str, this.wrapperFs.getConf());
        createRollbackMetadata(str2, str, z, z2);
    }

    private HoodieInstant createRollbackMetadata(String str, String str2, boolean z, boolean z2) throws IOException {
        if (z) {
            HoodieTestTable.of(this.metaClient).addInflightRollback(str);
        } else {
            HoodieRollbackMetadata build = HoodieRollbackMetadata.newBuilder().setVersion(1).setStartRollbackTime(str).setTotalFilesDeleted(1).setTimeTakenInMillis(1000L).setCommitsRollback(Collections.singletonList(str2)).setPartitionMetadata(Collections.emptyMap()).setInstantsRollback(Collections.emptyList()).build();
            HoodieTestTable.of(this.metaClient).addRollback(str, build, z2, (HoodieRollbackPlan) null);
            HoodieTestTable.of(this.metaClient).addRollbackCompleted(str, build, z2);
        }
        return new HoodieInstant(z, "rollback", str);
    }

    private void assertInstantListEquals(List<HoodieInstant> list, List<HoodieInstant> list2) {
        Assertions.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            HoodieInstant hoodieInstant = list.get(i);
            HoodieInstant hoodieInstant2 = list2.get(i);
            Assertions.assertEquals(hoodieInstant.getTimestamp(), hoodieInstant2.getTimestamp());
            Assertions.assertEquals(hoodieInstant.getAction(), hoodieInstant2.getAction());
            Assertions.assertEquals(hoodieInstant.getState(), hoodieInstant2.getState());
        }
    }
}
