package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.class */
public class TestRemoveOrphanFilesAction extends SparkTestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    protected static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).truncate("c2", 2).identity("c3").build();

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private File tableDir = null;
    protected String tableLocation = null;

    @Before
    public void setupTableLocation() throws Exception {
        this.tableDir = this.temp.newFolder();
        this.tableLocation = this.tableDir.toURI().toString();
    }

    @Test
    public void testDryRun() throws IOException, InterruptedException {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        List collectAsList = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assert.assertEquals("Should be 2 valid files", 2L, collectAsList.size());
        coalesce.write().mode("append").parquet(this.tableLocation + "/data");
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        List list = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return fileStatus.getPath().toString();
        }).collect(Collectors.toList());
        Assert.assertEquals("Should be 3 files", 3L, list.size());
        ArrayList newArrayList2 = Lists.newArrayList(list);
        newArrayList2.removeAll(collectAsList);
        Assert.assertEquals("Should be 1 invalid file", 1L, newArrayList2.size());
        Thread.sleep(1000L);
        SparkActions sparkActions = SparkActions.get();
        Assert.assertTrue("Default olderThan interval should be safe", Iterables.isEmpty(((DeleteOrphanFiles.Result) sparkActions.deleteOrphanFiles(create).deleteWith(str -> {
        }).execute()).orphanFileLocations()));
        Assert.assertEquals("Action should find 1 file", newArrayList2, ((DeleteOrphanFiles.Result) sparkActions.deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).deleteWith(str2 -> {
        }).execute()).orphanFileLocations());
        Assert.assertTrue("Invalid file should be present", fileSystem.exists(new Path((String) newArrayList2.get(0))));
        Assert.assertEquals("Action should delete 1 file", newArrayList2, ((DeleteOrphanFiles.Result) sparkActions.deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations());
        Assert.assertFalse("Invalid file should not be present", fileSystem.exists(new Path((String) newArrayList2.get(0))));
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList);
        Assert.assertEquals("Rows must match", newArrayList3, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testAllValidFilesAreKept() throws IOException, InterruptedException {
        Table create = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(create.snapshots());
        List<String> snapshotFiles = snapshotFiles(((Snapshot) newArrayList.get(0)).snapshotId());
        Assert.assertEquals(1L, snapshotFiles.size());
        List<String> snapshotFiles2 = snapshotFiles(((Snapshot) newArrayList.get(1)).snapshotId());
        Assert.assertEquals(1L, snapshotFiles2.size());
        List<String> snapshotFiles3 = snapshotFiles(((Snapshot) newArrayList.get(2)).snapshotId());
        Assert.assertEquals(2L, snapshotFiles3.size());
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        Thread.sleep(1000L);
        Assert.assertEquals("Should delete 4 files", 4L, Iterables.size(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
        FileSystem fileSystem = new Path(this.tableLocation + "/data").getFileSystem(spark.sessionState().newHadoopConf());
        Iterator<String> it = snapshotFiles.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("All snapshot files must remain", fileSystem.exists(new Path(it.next())));
        }
        Iterator<String> it2 = snapshotFiles2.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue("All snapshot files must remain", fileSystem.exists(new Path(it2.next())));
        }
        Iterator<String> it3 = snapshotFiles3.iterator();
        while (it3.hasNext()) {
            Assert.assertTrue("All snapshot files must remain", fileSystem.exists(new Path(it3.next())));
        }
    }

    @Test
    public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, IOException {
        Table create = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        Thread.sleep(1000L);
        HashSet newHashSet = Sets.newHashSet();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Assert.assertEquals(newKeySet, Sets.newHashSet(new String[]{"remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3"}));
        Assert.assertEquals("Should delete 4 files", 4L, newHashSet.size());
    }

    @Test
    public void testWapFilesAreKept() throws InterruptedException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.wap.enabled", "true");
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, ThreeColumnRecord.class);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        spark.conf().set("spark.wap.id", "1");
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Assert.assertEquals("Should not return data from the staged snapshot", newArrayList, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
        Thread.sleep(1000L);
        Assert.assertTrue("Should not delete any files", Iterables.isEmpty(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
    }

    @Test
    public void testMetadataFolderIsIntact() throws InterruptedException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.data.path", this.tableLocation);
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/c2_trunc=AA/c3=AAAA");
        Thread.sleep(1000L);
        Assert.assertEquals("Should delete 1 file", 1L, Iterables.size(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testOlderThanTimestamp() throws InterruptedException {
        Table create = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        Thread.sleep(1000L);
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(1000L);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        Assert.assertEquals("Should delete only 2 files", 2L, Iterables.size(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(currentTimeMillis).execute()).orphanFileLocations()));
    }

    @Test
    public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.data.path", this.tableLocation);
        newHashMap.put("write.metadata.previous-versions-max", "1");
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, ThreeColumnRecord.class);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Thread.sleep(1000L);
        DeleteOrphanFiles.Result result = (DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals("Should delete 1 file", 1L, Iterables.size(result.orphanFileLocations()));
        Assert.assertTrue("Should remove v1 file", StreamSupport.stream(result.orphanFileLocations().spliterator(), false).anyMatch(str -> {
            return str.contains("v1.metadata.json");
        }));
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.addAll(newArrayList);
        newArrayList2.addAll(newArrayList);
        Assert.assertEquals("Rows must match", newArrayList2, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testManyTopLevelPartitions() throws InterruptedException {
        Table create = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add(new ThreeColumnRecord(Integer.valueOf(i), String.valueOf(i), String.valueOf(i)));
        }
        spark.createDataFrame(newArrayList, ThreeColumnRecord.class).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Thread.sleep(1000L);
        Assert.assertTrue("Should not delete any files", Iterables.isEmpty(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testManyLeafPartitions() throws InterruptedException {
        Table create = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add(new ThreeColumnRecord(Integer.valueOf(i), String.valueOf(i % 3), String.valueOf(i)));
        }
        spark.createDataFrame(newArrayList, ThreeColumnRecord.class).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Thread.sleep(1000L);
        Assert.assertTrue("Should not delete any files", Iterables.isEmpty(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    private List<String> snapshotFiles(long j) {
        return spark.read().format("iceberg").option("snapshot-id", j).load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
    }

    @Test
    public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, InterruptedException {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), this.tableDir.getAbsolutePath());
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableDir.getAbsolutePath());
        List collectAsList = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assert.assertEquals("Should be 1 valid files", 1L, collectAsList.size());
        String str = (String) collectAsList.get(0);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data");
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        List list = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return fileStatus.getPath().toString();
        }).collect(Collectors.toList());
        Assert.assertEquals("Should be 2 files", 2L, list.size());
        ArrayList newArrayList = Lists.newArrayList(list);
        newArrayList.removeIf(str2 -> {
            return str2.contains(str);
        });
        Assert.assertEquals("Should be 1 invalid file", 1L, newArrayList.size());
        Thread.sleep(1000L);
        Assert.assertEquals("Action should find 1 file", newArrayList, ((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).deleteWith(str3 -> {
        }).execute()).orphanFileLocations());
        Assert.assertTrue("Invalid file should be present", fileSystem.exists(new Path((String) newArrayList.get(0))));
    }

    @Test
    public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException {
        Table createTable = new HadoopCatalog(new Configuration(), this.tableLocation).createTable(TableIdentifier.of(Namespace.of(new String[]{"testDb"}), "testTb"), SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap());
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(createTable.location());
        coalesce.write().mode("append").parquet(createTable.location() + "/data");
        Thread.sleep(1000L);
        createTable.refresh();
        Assert.assertEquals("Should delete only 1 files", 1L, Iterables.size(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(createTable).olderThan(System.currentTimeMillis()).execute()).orphanFileLocations()));
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(createTable.location()).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testHiveCatalogTable() throws IOException {
        Table createTable = catalog.createTable(TableIdentifier.of(new String[]{"default", "hivetestorphan"}), SCHEMA, SPEC, this.tableLocation, Maps.newHashMap());
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save("default.hivetestorphan");
        String replaceFirst = createTable.location().replaceFirst("file:", "");
        new File(replaceFirst + "/data/trashfile").createNewFile();
        Assert.assertTrue("trash file should be removed", StreamSupport.stream(((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(createTable).olderThan(System.currentTimeMillis() + 1000).execute()).orphanFileLocations().spliterator(), false).anyMatch(str -> {
            return str.contains("file:" + replaceFirst + "data/trashfile");
        }));
    }

    @Test
    public void testGarbageCollectionDisabled() {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        create.updateProperties().set("gc.enabled", "false").commit();
        AssertHelpers.assertThrows("Should complain about removing orphan files", ValidationException.class, "Cannot remove orphan files: GC is disabled", () -> {
            return (DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).execute();
        });
    }

    @Test
    public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "2"), this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        create.refresh();
        long snapshotId = create.currentSnapshot().snapshotId();
        long sequenceNumber = create.currentSnapshot().sequenceNumber();
        File file = new File(new URI(this.tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile();
        PuffinWriter build = Puffin.write(Files.localOutput(file)).build();
        Throwable th = null;
        try {
            try {
                build.add(new Blob("some-blob-type", ImmutableList.of(1), snapshotId, sequenceNumber, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
                build.finish();
                GenericStatisticsFile genericStatisticsFile = new GenericStatisticsFile(snapshotId, file.toString(), build.fileSize(), build.footerSize(), (List) build.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                Transaction newTransaction = create.newTransaction();
                newTransaction.updateStatistics().setStatistics(snapshotId, genericStatisticsFile).commit();
                newTransaction.commitTransaction();
                SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis() + 1000).execute();
                ((AbstractBooleanAssert) Assertions.assertThat(file.exists()).as("stats file should exist", new Object[0])).isTrue();
                Assertions.assertThat(file.length()).as("stats file length", new Object[0]).isEqualTo(genericStatisticsFile.fileSizeInBytes());
                Transaction newTransaction2 = create.newTransaction();
                newTransaction2.updateStatistics().removeStatistics(genericStatisticsFile.snapshotId()).commit();
                newTransaction2.commitTransaction();
                Iterable orphanFileLocations = ((DeleteOrphanFiles.Result) SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis() + 1000).execute()).orphanFileLocations();
                Assertions.assertThat(orphanFileLocations).as("Should be orphan files", new Object[0]).hasSize(1);
                ((AbstractStringAssert) Assertions.assertThat((String) Iterables.getOnlyElement(orphanFileLocations)).as("Deleted file", new Object[0])).isEqualTo(file.toURI().toString());
                ((AbstractBooleanAssert) Assertions.assertThat(file.exists()).as("stats file should be deleted", new Object[0])).isFalse();
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }
}
