package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.class */
public class TestExpireSnapshotsAction extends SparkTestBase {

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private File tableDir;
    private String tableLocation;
    private Table table;
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    private static final int SHUFFLE_PARTITIONS = 2;
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(SHUFFLE_PARTITIONS, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
    static final DataFile FILE_A = DataFiles.builder(SPEC).withPath("/path/to/data-a.parquet").withFileSizeInBytes(10).withPartitionPath("c1=0").withRecordCount(1).build();
    static final DataFile FILE_B = DataFiles.builder(SPEC).withPath("/path/to/data-b.parquet").withFileSizeInBytes(10).withPartitionPath("c1=1").withRecordCount(1).build();
    static final DataFile FILE_C = DataFiles.builder(SPEC).withPath("/path/to/data-c.parquet").withFileSizeInBytes(10).withPartitionPath("c1=2").withRecordCount(1).build();
    static final DataFile FILE_D = DataFiles.builder(SPEC).withPath("/path/to/data-d.parquet").withFileSizeInBytes(10).withPartitionPath("c1=3").withRecordCount(1).build();

    @Before
    public void setupTableLocation() throws Exception {
        this.tableDir = this.temp.newFolder();
        this.tableLocation = this.tableDir.toURI().toString();
        this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        spark.conf().set("spark.sql.shuffle.partitions", 2L);
    }

    private Long rightAfterSnapshot() {
        return rightAfterSnapshot(this.table.currentSnapshot().snapshotId());
    }

    private Long rightAfterSnapshot(long j) {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        while (true) {
            Long l = valueOf;
            if (l.longValue() > this.table.snapshot(j).timestampMillis()) {
                return l;
            }
            valueOf = Long.valueOf(System.currentTimeMillis());
        }
    }

    private void checkExpirationResults(long j, long j2, long j3, ExpireSnapshots.Result result) {
        Assert.assertEquals("Incorrect number of manifest files deleted", j2, result.deletedManifestsCount());
        Assert.assertEquals("Incorrect number of datafiles deleted", j, result.deletedDataFilesCount());
        Assert.assertEquals("Incorrect number of manifest lists deleted", j3, result.deletedManifestListsCount());
    }

    @Test
    public void testFilesCleaned() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(rightAfterSnapshot().longValue()).execute();
        Assert.assertEquals("Table does not have 1 snapshot after expiration", 1L, Iterables.size(this.table.snapshots()));
        checkExpirationResults(1L, 1L, 2L, result);
    }

    @Test
    public void dataFilesCleanupWithParallelTasks() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_B).commit();
        this.table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit();
        this.table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-snapshot-" + atomicInteger.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        })).expireOlderThan(longValue).deleteWith(str -> {
            newKeySet.add(Thread.currentThread().getName());
            newHashSet.add(str);
        }).execute();
        Assert.assertEquals(newKeySet, Sets.newHashSet(new String[]{"remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"}));
        Assert.assertTrue("FILE_A should be deleted", newHashSet.contains(FILE_A.path().toString()));
        Assert.assertTrue("FILE_B should be deleted", newHashSet.contains(FILE_B.path().toString()));
        checkExpirationResults(2L, 3L, 3L, result);
    }

    @Test
    public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        checkExpirationResults(0L, 0L, 0L, (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).execute());
    }

    @Test
    public void testCleanupRepeatedOverwrites() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        for (int i = 0; i < 10; i++) {
            this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
            this.table.newOverwrite().deleteFile(FILE_B).addFile(FILE_A).commit();
        }
        checkExpirationResults(1L, 39L, 20L, (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(rightAfterSnapshot().longValue()).execute());
    }

    @Test
    public void testRetainLastWithExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        for (long currentTimeMillis = System.currentTimeMillis(); currentTimeMillis <= this.table.currentSnapshot().timestampMillis(); currentTimeMillis = System.currentTimeMillis()) {
        }
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(rightAfterSnapshot().longValue()).retainLast(SHUFFLE_PARTITIONS).execute();
        Assert.assertEquals("Should have two snapshots.", 2L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertEquals("First snapshot should not present.", (Object) null, this.table.snapshot(snapshotId));
    }

    @Test
    public void testExpireTwoSnapshotsById() throws Exception {
        this.table.newAppend().appendFile(FILE_A).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId2 = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireSnapshotId(snapshotId).expireSnapshotId(snapshotId2).execute();
        Assert.assertEquals("Should have one snapshots.", 1L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertEquals("First snapshot should not present.", (Object) null, this.table.snapshot(snapshotId));
        Assert.assertEquals("Second snapshot should not be present.", (Object) null, this.table.snapshot(snapshotId2));
        checkExpirationResults(0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastWithExpireById() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireSnapshotId(snapshotId).retainLast(3).execute();
        Assert.assertEquals("Should have two snapshots.", 2L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertEquals("First snapshot should not present.", (Object) null, this.table.snapshot(snapshotId));
        checkExpirationResults(0L, 0L, 1L, result);
    }

    @Test
    public void testRetainLastWithTooFewSnapshots() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(rightAfterSnapshot().longValue()).retainLast(3).execute();
        Assert.assertEquals("Should have two snapshots", 2L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertEquals("First snapshot should still present", snapshotId, this.table.snapshot(snapshotId).snapshotId());
        checkExpirationResults(0L, 0L, 0L, result);
    }

    @Test
    public void testRetainLastKeepsExpiringSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(currentSnapshot.timestampMillis()).retainLast(SHUFFLE_PARTITIONS).execute();
        Assert.assertEquals("Should have three snapshots.", 3L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertNotNull("Second snapshot should present.", this.table.snapshot(currentSnapshot.snapshotId()));
        checkExpirationResults(0L, 0L, 1L, result);
    }

    @Test
    public void testExpireSnapshotsWithDisabledGarbageCollection() {
        this.table.updateProperties().set("gc.enabled", "false").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        AssertHelpers.assertThrows("Should complain about expiring snapshots", ValidationException.class, "Cannot expire snapshots: GC is disabled", () -> {
            return SparkActions.get().expireSnapshots(this.table);
        });
    }

    @Test
    public void testExpireOlderThanMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(currentSnapshot.timestampMillis()).expireOlderThan(this.table.currentSnapshot().timestampMillis()).execute();
        Assert.assertEquals("Should have one snapshots.", 1L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertNull("Second snapshot should not present.", this.table.snapshot(currentSnapshot.snapshotId()));
        checkExpirationResults(0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).expireOlderThan(rightAfterSnapshot().longValue()).retainLast(SHUFFLE_PARTITIONS).retainLast(1).execute();
        Assert.assertEquals("Should have one snapshots.", 1L, Lists.newArrayList(this.table.snapshots()).size());
        Assert.assertNull("Second snapshot should not present.", this.table.snapshot(currentSnapshot.snapshotId()));
        checkExpirationResults(0L, 0L, 2L, result);
    }

    @Test
    public void testRetainZeroSnapshots() {
        AssertHelpers.assertThrows("Should fail retain 0 snapshots because number of snapshots to retain cannot be zero", IllegalArgumentException.class, "Number of snapshots to retain must be at least 1, cannot be: 0", () -> {
            return (ExpireSnapshots.Result) SparkActions.get().expireSnapshots(this.table).retainLast(0).execute();
        });
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotAppend() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertTrue("FILE_A should be deleted", newHashSet.contains(FILE_A.path().toString()));
        checkExpirationResults(1L, 1L, 2L, result);
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotFastAppend() {
        this.table.updateProperties().set("commit.manifest-merge.enabled", "true").set("commit.manifest.min-count-to-merge", "1").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_D).commit();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertTrue("FILE_A should be deleted", newHashSet.contains(FILE_A.path().toString()));
        checkExpirationResults(1L, 1L, 2L, result);
    }

    @Test
    public void testWithExpiringDanglingStageCommit() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles) this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        TableMetadata current = this.table.operations().current();
        Snapshot snapshot = (Snapshot) current.snapshots().get(0);
        Snapshot snapshot2 = (Snapshot) current.snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireSnapshots = SparkActions.get().expireSnapshots(this.table);
        Objects.requireNonNull(newHashSet);
        checkExpirationResults(1L, 1L, 2L, (ExpireSnapshots.Result) expireSnapshots.deleteWith((v1) -> {
            r1.add(v1);
        }).expireOlderThan(snapshot2.timestampMillis() + 1).execute());
        HashSet newHashSet2 = Sets.newHashSet();
        newHashSet2.add(snapshot.manifestListLocation());
        snapshot2.addedFiles().forEach(dataFile -> {
            newHashSet2.add(dataFile.path().toString());
        });
        newHashSet2.add(snapshot2.manifestListLocation());
        snapshot2.dataManifests().forEach(manifestFile -> {
            if (manifestFile.snapshotId().longValue() == snapshot2.snapshotId()) {
                newHashSet2.add(manifestFile.path());
            }
        });
        Assert.assertSame("Files deleted count should be expected", Integer.valueOf(newHashSet2.size()), Integer.valueOf(newHashSet.size()));
        newHashSet2.removeAll(newHashSet);
        Assert.assertTrue("Exactly same files should be deleted", newHashSet2.isEmpty());
    }

    @Test
    public void testWithCherryPickTableSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        HashSet newHashSet = Sets.newHashSet();
        OverwriteFiles deleteFile = this.table.newOverwrite().addFile(FILE_B).deleteFile(FILE_A);
        Objects.requireNonNull(newHashSet);
        ((OverwriteFiles) deleteFile.deleteWith((v1) -> {
            r1.add(v1);
        })).commit();
        Assert.assertTrue("No files should be physically deleted", newHashSet.isEmpty());
        Snapshot currentSnapshot2 = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        Snapshot currentSnapshot3 = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(currentSnapshot.snapshotId()).commit();
        this.table.manageSnapshots().cherrypick(currentSnapshot2.snapshotId()).commit();
        Snapshot currentSnapshot4 = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(currentSnapshot3.snapshotId()).commit();
        ArrayList newArrayList = Lists.newArrayList();
        ExpireSnapshots expireSnapshots = SparkActions.get().expireSnapshots(this.table);
        Objects.requireNonNull(newArrayList);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireSnapshots.deleteWith((v1) -> {
            r1.add(v1);
        }).expireOlderThan(currentSnapshot3.timestampMillis() + 1).execute();
        Lists.newArrayList(new Snapshot[]{currentSnapshot2, currentSnapshot3, currentSnapshot4}).forEach(snapshot -> {
            snapshot.addedFiles().forEach(dataFile -> {
                Assert.assertFalse(newArrayList.contains(dataFile.path().toString()));
            });
        });
        checkExpirationResults(1L, 2L, 2L, result);
    }

    @Test
    public void testWithExpiringStagedThenCherrypick() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles) this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        Snapshot snapshot = (Snapshot) this.table.operations().current().snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.manageSnapshots().cherrypick(snapshot.snapshotId()).commit();
        Snapshot snapshot2 = (Snapshot) this.table.operations().current().snapshots().get(3);
        ArrayList newArrayList = Lists.newArrayList();
        ExpireSnapshots expireSnapshots = SparkActions.get().expireSnapshots(this.table);
        Objects.requireNonNull(newArrayList);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireSnapshots.deleteWith((v1) -> {
            r1.add(v1);
        }).expireSnapshotId(snapshot.snapshotId()).execute();
        Lists.newArrayList(new Snapshot[]{snapshot}).forEach(snapshot3 -> {
            snapshot3.addedFiles().forEach(dataFile -> {
                Assert.assertFalse(newArrayList.contains(dataFile.path().toString()));
            });
        });
        checkExpirationResults(0L, 1L, 1L, result);
        ExpireSnapshots expireSnapshots2 = SparkActions.get().expireSnapshots(this.table);
        Objects.requireNonNull(newArrayList);
        ExpireSnapshots.Result result2 = (ExpireSnapshots.Result) expireSnapshots2.deleteWith((v1) -> {
            r1.add(v1);
        }).expireOlderThan(this.table.currentSnapshot().timestampMillis() + 1).execute();
        Lists.newArrayList(new Snapshot[]{snapshot, snapshot2}).forEach(snapshot4 -> {
            snapshot4.addedFiles().forEach(dataFile -> {
                Assert.assertFalse(newArrayList.contains(dataFile.path().toString()));
            });
        });
        checkExpirationResults(0L, 0L, 2L, result2);
    }

    @Test
    public void testExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertEquals("Expire should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNull("Expire should remove the oldest snapshot", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertEquals("Should remove only the expired manifest list location", Sets.newHashSet(new String[]{currentSnapshot.manifestListLocation()}), newHashSet);
        checkExpirationResults(0L, 0L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithDelete() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        Assert.assertEquals("Should create one manifest", 1L, currentSnapshot.allManifests().size());
        rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot currentSnapshot2 = this.table.currentSnapshot();
        Assert.assertEquals("Should create replace manifest with a rewritten manifest", 1L, currentSnapshot2.allManifests().size());
        this.table.newAppend().appendFile(FILE_B).commit();
        rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertEquals("Expire should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNull("Expire should remove the oldest snapshot", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertNull("Expire should remove the second oldest snapshot", this.table.snapshot(currentSnapshot2.snapshotId()));
        Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet(new CharSequence[]{currentSnapshot.manifestListLocation(), ((ManifestFile) currentSnapshot.allManifests().get(0)).path(), currentSnapshot2.manifestListLocation(), ((ManifestFile) currentSnapshot2.allManifests().get(0)).path(), FILE_A.path()}), newHashSet);
        checkExpirationResults(1L, 2L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithDeleteInMergedManifests() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        Assert.assertEquals("Should create one manifest", 1L, currentSnapshot.allManifests().size());
        rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot currentSnapshot2 = this.table.currentSnapshot();
        Assert.assertEquals("Should replace manifest with a rewritten manifest", 1L, currentSnapshot2.allManifests().size());
        this.table.newFastAppend().appendFile(FILE_C).commit();
        rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertEquals("Expire should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNull("Expire should remove the oldest snapshot", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertNull("Expire should remove the second oldest snapshot", this.table.snapshot(currentSnapshot2.snapshotId()));
        Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet(new CharSequence[]{currentSnapshot.manifestListLocation(), ((ManifestFile) currentSnapshot.allManifests().get(0)).path(), currentSnapshot2.manifestListLocation(), FILE_A.path()}), newHashSet);
        checkExpirationResults(1L, 1L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithRollback() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        Assert.assertEquals("Should create one manifest", 1L, currentSnapshot.allManifests().size());
        rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_B).commit();
        Snapshot currentSnapshot2 = this.table.currentSnapshot();
        HashSet newHashSet = Sets.newHashSet(currentSnapshot2.allManifests());
        newHashSet.removeAll(currentSnapshot.allManifests());
        Assert.assertEquals("Should add one new manifest for append", 1L, newHashSet.size());
        this.table.manageSnapshots().rollbackTo(currentSnapshot.snapshotId()).commit();
        long longValue = rightAfterSnapshot(currentSnapshot2.snapshotId()).longValue();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet newHashSet2 = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet2);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertEquals("Expire should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNotNull("Expire should keep the oldest snapshot, current", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertNull("Expire should remove the orphaned snapshot", this.table.snapshot(currentSnapshot2.snapshotId()));
        Assert.assertEquals("Should remove expired manifest lists and reverted appended data file", Sets.newHashSet(new String[]{currentSnapshot2.manifestListLocation(), ((ManifestFile) Iterables.getOnlyElement(newHashSet)).path()}), newHashSet2);
        checkExpirationResults(0L, 1L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithRollbackAndMergedManifests() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        Assert.assertEquals("Should create one manifest", 1L, currentSnapshot.allManifests().size());
        rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot currentSnapshot2 = this.table.currentSnapshot();
        HashSet newHashSet = Sets.newHashSet(currentSnapshot2.allManifests());
        newHashSet.removeAll(currentSnapshot.allManifests());
        Assert.assertEquals("Should add one new manifest for append", 1L, newHashSet.size());
        this.table.manageSnapshots().rollbackTo(currentSnapshot.snapshotId()).commit();
        long longValue = rightAfterSnapshot(currentSnapshot2.snapshotId()).longValue();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet newHashSet2 = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet2);
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute();
        Assert.assertEquals("Expire should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNotNull("Expire should keep the oldest snapshot, current", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertNull("Expire should remove the orphaned snapshot", this.table.snapshot(currentSnapshot2.snapshotId()));
        Assert.assertEquals("Should remove expired manifest lists and reverted appended data file", Sets.newHashSet(new CharSequence[]{currentSnapshot2.manifestListLocation(), ((ManifestFile) Iterables.getOnlyElement(newHashSet)).path(), FILE_B.path()}), newHashSet2);
        checkExpirationResults(1L, 1L, 1L, result);
    }

    @Test
    public void testExpireOnEmptyTable() {
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(System.currentTimeMillis());
        Objects.requireNonNull(newHashSet);
        checkExpirationResults(0L, 0L, 0L, (ExpireSnapshots.Result) expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        }).execute());
    }

    @Test
    public void testExpireAction() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long longValue = rightAfterSnapshot().longValue();
        HashSet newHashSet = Sets.newHashSet();
        ExpireSnapshots expireOlderThan = SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue);
        Objects.requireNonNull(newHashSet);
        BaseExpireSnapshotsSparkAction deleteWith = expireOlderThan.deleteWith((v1) -> {
            r1.add(v1);
        });
        Dataset expire = deleteWith.expire();
        List collectAsList = expire.collectAsList();
        Assert.assertEquals("Should not change current snapshot", snapshotId, this.table.currentSnapshot().snapshotId());
        Assert.assertNull("Should remove the oldest snapshot", this.table.snapshot(currentSnapshot.snapshotId()));
        Assert.assertEquals("Pending deletes should contain one row", 1L, collectAsList.size());
        Assert.assertEquals("Pending delete should be the expired manifest list location", currentSnapshot.manifestListLocation(), ((Row) collectAsList.get(0)).getString(0));
        Assert.assertEquals("Pending delete should be a manifest list", "Manifest List", ((Row) collectAsList.get(0)).getString(1));
        Assert.assertEquals("Should not delete any files", 0L, newHashSet.size());
        Assert.assertSame("Multiple calls to expire should return the same deleted files", expire, deleteWith.expire());
    }

    @Test
    public void testUseLocalIterator() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        long longValue = rightAfterSnapshot().longValue();
        int i = spark.sparkContext().dagScheduler().nextJobId().get();
        ExpireSnapshots.Result result = (ExpireSnapshots.Result) ((ExpireSnapshots) SparkActions.get().expireSnapshots(this.table).expireOlderThan(longValue).option("stream-results", "true")).execute();
        Assert.assertEquals("Table does not have 1 snapshot after expiration", 1L, Iterables.size(this.table.snapshots()));
        int i2 = spark.sparkContext().dagScheduler().nextJobId().get() - i;
        checkExpirationResults(1L, 1L, 2L, result);
        Assert.assertTrue(String.format("Expected more than %d jobs when using local iterator, ran %d", Integer.valueOf(SHUFFLE_PARTITIONS), Integer.valueOf(i2)), i2 > SHUFFLE_PARTITIONS);
    }
}
