/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public abstract class TestRemoveOrphanFilesAction
extends SparkTestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    protected static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    protected static final PartitionSpec SPEC = PartitionSpec.builderFor((Schema)SCHEMA).truncate("c2", 2).identity("c3").build();
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private File tableDir = null;
    protected String tableLocation = null;

    @Before
    public void setupTableLocation() throws Exception {
        this.tableDir = this.temp.newFolder();
        this.tableLocation = this.tableDir.toURI().toString();
    }

    @Test
    public void testDryRun() throws IOException, InterruptedException {
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        List validFiles = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assert.assertEquals((String)"Should be 2 valid files", (long)2L, (long)validFiles.size());
        df.write().mode("append").parquet(this.tableLocation + "/data");
        Path dataPath = new Path(this.tableLocation + "/data");
        FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
        List allFiles = Arrays.stream(fs.listStatus(dataPath, (PathFilter)HiddenPathFilter.get())).filter(FileStatus::isFile).map(file -> file.getPath().toString()).collect(Collectors.toList());
        Assert.assertEquals((String)"Should be 3 files", (long)3L, (long)allFiles.size());
        ArrayList invalidFiles = Lists.newArrayList(allFiles);
        invalidFiles.removeAll(validFiles);
        Assert.assertEquals((String)"Should be 1 invalid file", (long)1L, (long)invalidFiles.size());
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
        Assert.assertTrue((String)"Default olderThan interval should be safe", (boolean)Iterables.isEmpty((Iterable)result1.orphanFileLocations()));
        DeleteOrphanFiles.Result result2 = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {}).execute();
        Assert.assertEquals((String)"Action should find 1 file", (Object)invalidFiles, (Object)result2.orphanFileLocations());
        Assert.assertTrue((String)"Invalid file should be present", (boolean)fs.exists(new Path((String)invalidFiles.get(0))));
        DeleteOrphanFiles.Result result3 = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Action should delete 1 file", (Object)invalidFiles, (Object)result3.orphanFileLocations());
        Assert.assertFalse((String)"Invalid file should not be present", (boolean)fs.exists(new Path((String)invalidFiles.get(0))));
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(records);
        expectedRecords.addAll(records);
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)expectedRecords, (Object)actualRecords);
    }

    @Test
    public void testAllValidFilesAreKept() throws IOException, InterruptedException {
        Table table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records1 = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df1 = spark.createDataFrame((List)records1, ThreeColumnRecord.class).coalesce(1);
        df1.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        ArrayList records2 = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")});
        Dataset df2 = spark.createDataFrame((List)records2, ThreeColumnRecord.class).coalesce(1);
        df2.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        df2.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        ArrayList snapshots = Lists.newArrayList((Iterable)table.snapshots());
        List<String> snapshotFiles1 = this.snapshotFiles(((Snapshot)snapshots.get(0)).snapshotId());
        Assert.assertEquals((long)1L, (long)snapshotFiles1.size());
        List<String> snapshotFiles2 = this.snapshotFiles(((Snapshot)snapshots.get(1)).snapshotId());
        Assert.assertEquals((long)1L, (long)snapshotFiles2.size());
        List<String> snapshotFiles3 = this.snapshotFiles(((Snapshot)snapshots.get(2)).snapshotId());
        Assert.assertEquals((long)2L, (long)snapshotFiles3.size());
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 4 files", (long)4L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
        Path dataPath = new Path(this.tableLocation + "/data");
        FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
        for (String fileLocation : snapshotFiles1) {
            Assert.assertTrue((String)"All snapshot files must remain", (boolean)fs.exists(new Path(fileLocation)));
        }
        for (String fileLocation : snapshotFiles2) {
            Assert.assertTrue((String)"All snapshot files must remain", (boolean)fs.exists(new Path(fileLocation)));
        }
        for (String fileLocation : snapshotFiles3) {
            Assert.assertTrue((String)"All snapshot files must remain", (boolean)fs.exists(new Path(fileLocation)));
        }
    }

    @Test
    public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, IOException {
        Table table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records1 = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df1 = spark.createDataFrame((List)records1, ThreeColumnRecord.class).coalesce(1);
        df1.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        ArrayList records2 = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")});
        Dataset df2 = spark.createDataFrame((List)records2, ThreeColumnRecord.class).coalesce(1);
        df2.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        df2.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        df2.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        this.waitUntilAfter(System.currentTimeMillis());
        HashSet deletedFiles = Sets.newHashSet();
        ConcurrentHashMap.KeySetView deleteThreads = ConcurrentHashMap.newKeySet();
        AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
        ExecutorService executorService = Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-orphan-" + deleteThreadsIndex.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        DeleteOrphanFiles.Result result = SparkActions.get().deleteOrphanFiles(table).executeDeleteWith(executorService).olderThan(System.currentTimeMillis() + 5000L).deleteWith(file -> {
            deleteThreads.add(Thread.currentThread().getName());
            deletedFiles.add(file);
        }).execute();
        Assert.assertEquals(deleteThreads, (Object)Sets.newHashSet((Object[])new String[]{"remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3"}));
        Assert.assertEquals((String)"Should delete 4 files", (long)4L, (long)deletedFiles.size());
    }

    @Test
    public void testWapFilesAreKept() throws InterruptedException {
        HashMap props = Maps.newHashMap();
        props.put("write.wap.enabled", "true");
        Table table = TABLES.create(SCHEMA, SPEC, (Map)props, this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        spark.conf().set("spark.wap.id", "1");
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Should not return data from the staged snapshot", (Object)records, (Object)actualRecords);
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertTrue((String)"Should not delete any files", (boolean)Iterables.isEmpty((Iterable)result.orphanFileLocations()));
    }

    @Test
    public void testMetadataFolderIsIntact() throws InterruptedException {
        HashMap props = Maps.newHashMap();
        props.put("write.data.path", this.tableLocation);
        Table table = TABLES.create(SCHEMA, SPEC, (Map)props, this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.write().mode("append").parquet(this.tableLocation + "/c2_trunc=AA/c3=AAAA");
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 1 file", (long)1L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)records, (Object)actualRecords);
    }

    @Test
    public void testOlderThanTimestamp() throws InterruptedException {
        Table table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        df.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        this.waitUntilAfter(System.currentTimeMillis());
        long timestamp = System.currentTimeMillis();
        this.waitUntilAfter(System.currentTimeMillis() + 1000L);
        df.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(timestamp).execute();
        Assert.assertEquals((String)"Should delete only 2 files", (long)2L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
    }

    @Test
    public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedException {
        HashMap props = Maps.newHashMap();
        props.put("write.data.path", this.tableLocation);
        props.put("write.metadata.previous-versions-max", "1");
        Table table = TABLES.create(SCHEMA, SPEC, (Map)props, this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 1 file", (long)1L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
        Assert.assertTrue((String)"Should remove v1 file", (boolean)StreamSupport.stream(result.orphanFileLocations().spliterator(), false).anyMatch(file -> file.contains("v1.metadata.json")));
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(records);
        expectedRecords.addAll(records);
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)expectedRecords, (Object)actualRecords);
    }

    @Test
    public void testManyTopLevelPartitions() throws InterruptedException {
        Table table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i)));
        }
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertTrue((String)"Should not delete any files", (boolean)Iterables.isEmpty((Iterable)result.orphanFileLocations()));
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        Assert.assertEquals((String)"Rows count must match", (long)records.size(), (long)resultDF.count());
    }

    @Test
    public void testManyLeafPartitions() throws InterruptedException {
        Table table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            records.add(new ThreeColumnRecord(i, String.valueOf(i % 3), String.valueOf(i)));
        }
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertTrue((String)"Should not delete any files", (boolean)Iterables.isEmpty((Iterable)result.orphanFileLocations()));
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        Assert.assertEquals((String)"Row count must match", (long)records.size(), (long)resultDF.count());
    }

    @Test
    public void testHiddenPartitionPaths() throws InterruptedException {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"_c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)schema).truncate("_c2", 2).identity("c3").build();
        Table table = TABLES.create(schema, spec, (Map)Maps.newHashMap(), this.tableLocation);
        StructType structType = new StructType().add("c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType);
        ArrayList records = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "AAAAAAAAAA", "AAAA"})});
        Dataset df = spark.createDataFrame((List)records, structType).coalesce(1);
        df.select("c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
        df.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 2 files", (long)2L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
    }

    @Test
    public void testHiddenPartitionPathsWithPartitionEvolution() throws InterruptedException {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"_c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"_c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)schema).truncate("_c2", 2).build();
        Table table = TABLES.create(schema, spec, (Map)Maps.newHashMap(), this.tableLocation);
        StructType structType = new StructType().add("_c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType);
        ArrayList records = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "AAAAAAAAAA", "AAAA"})});
        Dataset df = spark.createDataFrame((List)records, structType).coalesce(1);
        df.select("_c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA");
        table.updateSpec().addField("_c1").commit();
        df.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/_c1=1");
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 2 files", (long)2L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
    }

    @Test
    public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws InterruptedException, IOException {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"_c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)schema).truncate("_c2", 2).identity("c3").build();
        Table table = TABLES.create(schema, spec, (Map)Maps.newHashMap(), this.tableLocation);
        StructType structType = new StructType().add("c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType);
        ArrayList records = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "AAAAAAAAAA", "AAAA"})});
        Dataset df = spark.createDataFrame((List)records, structType).coalesce(1);
        df.select("c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Path dataPath = new Path(this.tableLocation + "/data");
        FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
        Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt");
        fs.createNewFile(pathToFileInHiddenFolder);
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete 0 files", (long)0L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
        Assert.assertTrue((boolean)fs.exists(pathToFileInHiddenFolder));
    }

    private List<String> snapshotFiles(long snapshotId) {
        return spark.read().format("iceberg").option("snapshot-id", snapshotId).load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
    }

    @Test
    public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, InterruptedException {
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), this.tableDir.getAbsolutePath());
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableDir.getAbsolutePath());
        List validFiles = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assert.assertEquals((String)"Should be 1 valid files", (long)1L, (long)validFiles.size());
        String validFile = (String)validFiles.get(0);
        df.write().mode("append").parquet(this.tableLocation + "/data");
        Path dataPath = new Path(this.tableLocation + "/data");
        FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
        List allFiles = Arrays.stream(fs.listStatus(dataPath, (PathFilter)HiddenPathFilter.get())).filter(FileStatus::isFile).map(file -> file.getPath().toString()).collect(Collectors.toList());
        Assert.assertEquals((String)"Should be 2 files", (long)2L, (long)allFiles.size());
        ArrayList invalidFiles = Lists.newArrayList(allFiles);
        invalidFiles.removeIf(file -> file.contains(validFile));
        Assert.assertEquals((String)"Should be 1 invalid file", (long)1L, (long)invalidFiles.size());
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {}).execute();
        Assert.assertEquals((String)"Action should find 1 file", (Object)invalidFiles, (Object)result.orphanFileLocations());
        Assert.assertTrue((String)"Invalid file should be present", (boolean)fs.exists(new Path((String)invalidFiles.get(0))));
    }

    @Test
    public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException {
        HadoopCatalog catalog = new HadoopCatalog(new Configuration(), this.tableLocation);
        String namespaceName = "testDb";
        String tableName = "testTb";
        Namespace namespace = Namespace.of((String[])new String[]{namespaceName});
        TableIdentifier tableIdentifier = TableIdentifier.of((Namespace)namespace, (String)tableName);
        Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap());
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(table.location());
        df.write().mode("append").parquet(table.location() + "/data");
        this.waitUntilAfter(System.currentTimeMillis());
        table.refresh();
        DeleteOrphanFiles.Result result = SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Should delete only 1 files", (long)1L, (long)Iterables.size((Iterable)result.orphanFileLocations()));
        Dataset resultDF = spark.read().format("iceberg").load(table.location());
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)records, (Object)actualRecords);
    }

    @Test
    public void testHiveCatalogTable() throws IOException {
        Table table = catalog.createTable(TableIdentifier.of((String[])new String[]{"default", "hivetestorphan"}), SCHEMA, SPEC, this.tableLocation, (Map)Maps.newHashMap());
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save("default.hivetestorphan");
        String location = table.location().replaceFirst("file:", "");
        new File(location + "/data/trashfile").createNewFile();
        DeleteOrphanFiles.Result result = SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis() + 1000L).execute();
        Assert.assertTrue((String)"trash file should be removed", (boolean)StreamSupport.stream(result.orphanFileLocations().spliterator(), false).anyMatch(file -> file.contains("file:" + location + "/data/trashfile")));
    }

    @Test
    public void testGarbageCollectionDisabled() {
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        table.updateProperties().set("gc.enabled", "false").commit();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SparkActions.get().deleteOrphanFiles(table).execute()).isInstanceOf(ValidationException.class)).hasMessage("Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
    }

    @Test
    public void testCompareToFileList() throws IOException, InterruptedException {
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Path dataPath = new Path(this.tableLocation + "/data");
        FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
        List validFiles = Arrays.stream(fs.listStatus(dataPath, (PathFilter)HiddenPathFilter.get())).filter(FileStatus::isFile).map(file -> new FilePathLastModifiedRecord(file.getPath().toString(), new Timestamp(file.getModificationTime()))).collect(Collectors.toList());
        Assert.assertEquals((String)"Should be 2 valid files", (long)2L, (long)validFiles.size());
        df.write().mode("append").parquet(this.tableLocation + "/data");
        List allFiles = Arrays.stream(fs.listStatus(dataPath, (PathFilter)HiddenPathFilter.get())).filter(FileStatus::isFile).map(file -> new FilePathLastModifiedRecord(file.getPath().toString(), new Timestamp(file.getModificationTime()))).collect(Collectors.toList());
        Assert.assertEquals((String)"Should be 3 files", (long)3L, (long)allFiles.size());
        ArrayList invalidFiles = Lists.newArrayList(allFiles);
        invalidFiles.removeAll(validFiles);
        List invalidFilePaths = invalidFiles.stream().map(FilePathLastModifiedRecord::getFilePath).collect(Collectors.toList());
        Assert.assertEquals((String)"Should be 1 invalid file", (long)1L, (long)invalidFiles.size());
        this.waitUntilAfter(System.currentTimeMillis());
        SparkActions actions = SparkActions.get();
        Dataset compareToFileList = spark.createDataFrame(allFiles, FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified");
        DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).compareToFileList(compareToFileList).deleteWith(s -> {}).execute();
        Assert.assertTrue((String)"Default olderThan interval should be safe", (boolean)Iterables.isEmpty((Iterable)result1.orphanFileLocations()));
        DeleteOrphanFiles.Result result2 = actions.deleteOrphanFiles(table).compareToFileList(compareToFileList).olderThan(System.currentTimeMillis()).deleteWith(s -> {}).execute();
        Assert.assertEquals((String)"Action should find 1 file", invalidFilePaths, (Object)result2.orphanFileLocations());
        Assert.assertTrue((String)"Invalid file should be present", (boolean)fs.exists(new Path((String)invalidFilePaths.get(0))));
        DeleteOrphanFiles.Result result3 = actions.deleteOrphanFiles(table).compareToFileList(compareToFileList).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals((String)"Action should delete 1 file", invalidFilePaths, (Object)result3.orphanFileLocations());
        Assert.assertFalse((String)"Invalid file should not be present", (boolean)fs.exists(new Path((String)invalidFilePaths.get(0))));
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(records);
        expectedRecords.addAll(records);
        Dataset resultDF = spark.read().format("iceberg").load(this.tableLocation);
        List actualRecords = resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)expectedRecords, (Object)actualRecords);
        ArrayList outsideLocationMockFiles = Lists.newArrayList((Object[])new FilePathLastModifiedRecord[]{new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L))});
        Dataset compareToFileListWithOutsideLocation = spark.createDataFrame((List)outsideLocationMockFiles, FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified");
        DeleteOrphanFiles.Result result4 = actions.deleteOrphanFiles(table).compareToFileList(compareToFileListWithOutsideLocation).deleteWith(s -> {}).execute();
        Assert.assertEquals((String)"Action should find nothing", (Object)Lists.newArrayList(), (Object)result4.orphanFileLocations());
    }

    @Override
    protected long waitUntilAfter(long timestampMillis) {
        long current = System.currentTimeMillis();
        while (current <= timestampMillis) {
            current = System.currentTimeMillis();
        }
        return current;
    }

    @Test
    public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
        GenericStatisticsFile statisticsFile;
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)ImmutableMap.of((Object)"format-version", (Object)"2"), this.tableLocation);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        table.refresh();
        long snapshotId = table.currentSnapshot().snapshotId();
        long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber();
        File statsLocation = new File(new URI(this.tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile();
        try (PuffinWriter puffinWriter = Puffin.write((OutputFile)Files.localOutput((File)statsLocation)).build();){
            puffinWriter.add(new Blob("some-blob-type", (List)ImmutableList.of((Object)1), snapshotId, snapshotSequenceNumber, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
            puffinWriter.finish();
            statisticsFile = new GenericStatisticsFile(snapshotId, statsLocation.toString(), puffinWriter.fileSize(), puffinWriter.footerSize(), (List)puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
        }
        Transaction transaction = table.newTransaction();
        transaction.updateStatistics().setStatistics(snapshotId, (StatisticsFile)statisticsFile).commit();
        transaction.commitTransaction();
        SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis() + 1000L).execute();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)statsLocation.exists()).as("stats file should exist", new Object[0])).isTrue();
        ((AbstractLongAssert)Assertions.assertThat((long)statsLocation.length()).as("stats file length", new Object[0])).isEqualTo(statisticsFile.fileSizeInBytes());
        transaction = table.newTransaction();
        transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit();
        transaction.commitTransaction();
        DeleteOrphanFiles.Result result = SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis() + 1000L).execute();
        Iterable orphanFileLocations = result.orphanFileLocations();
        ((IterableAssert)Assertions.assertThat((Iterable)orphanFileLocations).as("Should be orphan files", new Object[0])).hasSize(1);
        ((AbstractStringAssert)Assertions.assertThat((String)((String)Iterables.getOnlyElement((Iterable)orphanFileLocations))).as("Deleted file", new Object[0])).isEqualTo(statsLocation.toURI().toString());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)statsLocation.exists()).as("stats file should be deleted", new Object[0])).isFalse();
    }

    @Test
    public void testPathsWithExtraSlashes() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"file:///dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"file:///dir1/////dir2///file1"});
        this.executeTest(validFiles, actualFiles, Lists.newArrayList());
    }

    @Test
    public void testPathsWithValidFileHavingNoAuthority() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"hdfs:///dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename/dir1/dir2/file1"});
        this.executeTest(validFiles, actualFiles, Lists.newArrayList());
    }

    @Test
    public void testPathsWithActualFileHavingNoAuthority() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename/dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"hdfs:///dir1/dir2/file1"});
        this.executeTest(validFiles, actualFiles, Lists.newArrayList());
    }

    @Test
    public void testPathsWithEqualSchemes() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"scheme1://bucket1/dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"scheme2://bucket1/dir1/dir2/file1"});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.executeTest(validFiles, actualFiles, Lists.newArrayList(), (Map<String, String>)ImmutableMap.of(), (Map<String, String>)ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR)).isInstanceOf(ValidationException.class)).hasMessageStartingWith("Unable to determine whether certain files are orphan").hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)].");
        HashMap equalSchemes = Maps.newHashMap();
        equalSchemes.put("scheme1", "scheme");
        equalSchemes.put("scheme2", "scheme");
        this.executeTest(validFiles, actualFiles, Lists.newArrayList(), equalSchemes, (Map<String, String>)ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR);
    }

    @Test
    public void testPathsWithEqualAuthorities() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename1/dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename2/dir1/dir2/file1"});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.executeTest(validFiles, actualFiles, Lists.newArrayList(), (Map<String, String>)ImmutableMap.of(), (Map<String, String>)ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR)).isInstanceOf(ValidationException.class)).hasMessageStartingWith("Unable to determine whether certain files are orphan").hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)].");
        HashMap equalAuthorities = Maps.newHashMap();
        equalAuthorities.put("servicename1", "servicename");
        equalAuthorities.put("servicename2", "servicename");
        this.executeTest(validFiles, actualFiles, Lists.newArrayList(), (Map<String, String>)ImmutableMap.of(), equalAuthorities, DeleteOrphanFiles.PrefixMismatchMode.ERROR);
    }

    @Test
    public void testRemoveOrphanFileActionWithDeleteMode() {
        ArrayList validFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename1/dir1/dir2/file1"});
        ArrayList actualFiles = Lists.newArrayList((Object[])new String[]{"hdfs://servicename2/dir1/dir2/file1"});
        this.executeTest(validFiles, actualFiles, Lists.newArrayList((Object[])new String[]{"hdfs://servicename2/dir1/dir2/file1"}), (Map<String, String>)ImmutableMap.of(), (Map<String, String>)ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.DELETE);
    }

    private void executeTest(List<String> validFiles, List<String> actualFiles, List<String> expectedOrphanFiles) {
        this.executeTest(validFiles, actualFiles, expectedOrphanFiles, (Map<String, String>)ImmutableMap.of(), (Map<String, String>)ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
    }

    private void executeTest(List<String> validFiles, List<String> actualFiles, List<String> expectedOrphanFiles, Map<String, String> equalSchemes, Map<String, String> equalAuthorities, DeleteOrphanFiles.PrefixMismatchMode mode) {
        DeleteOrphanFilesSparkAction.StringToFileURI toFileUri = new DeleteOrphanFilesSparkAction.StringToFileURI(equalSchemes, equalAuthorities);
        Dataset validFileDS = spark.createDataset(validFiles, Encoders.STRING());
        Dataset actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
        List orphanFiles = DeleteOrphanFilesSparkAction.findOrphanFiles((SparkSession)spark, (Dataset)toFileUri.apply(actualFileDS), (Dataset)toFileUri.apply(validFileDS), (DeleteOrphanFiles.PrefixMismatchMode)mode);
        Assert.assertEquals(expectedOrphanFiles, (Object)orphanFiles);
    }
}

