package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.class */
public abstract class TestRemoveOrphanFilesAction extends TestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    protected static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).truncate("c2", 2).identity("c3").build();

    @TempDir
    private File tableDir = null;
    protected String tableLocation = null;
    protected Map<String, String> properties;

    @Parameter
    private int formatVersion;

    @Parameters(name = "formatVersion = {0}")
    protected static List<Object> parameters() {
        return Arrays.asList(2, 3);
    }

    @BeforeEach
    public void setupTableLocation() throws Exception {
        this.tableLocation = this.tableDir.toURI().toString();
        this.properties = ImmutableMap.of("format-version", String.valueOf(this.formatVersion));
    }

    @TestTemplate
    public void testDryRun() throws IOException {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), this.properties, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        List collectAsList = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assertions.assertThat(collectAsList).as("Should be 2 valid files", new Object[0]).hasSize(2);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data");
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        List list = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return fileStatus.getPath().toString();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).as("Should be 3 valid files", new Object[0]).hasSize(3);
        ArrayList newArrayList2 = Lists.newArrayList(list);
        newArrayList2.removeAll(collectAsList);
        Assertions.assertThat(newArrayList2).as("Should be 1 invalid file", new Object[0]).hasSize(1);
        waitUntilAfter(System.currentTimeMillis());
        SparkActions sparkActions = SparkActions.get();
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).deleteWith(str -> {
        }).execute().orphanFileLocations()).as("Default olderThan interval should be safe", new Object[0]).isEmpty();
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).deleteWith(str2 -> {
        }).execute().orphanFileLocations()).as("Action should find 1 file", new Object[0]).isEqualTo(newArrayList2);
        ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path((String) newArrayList2.get(0)))).as("Invalid file should be present", new Object[0])).isTrue();
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Action should delete 1 file", new Object[0]).isEqualTo(newArrayList2);
        ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path((String) newArrayList2.get(0)))).as("Invalid file should not be present", new Object[0])).isFalse();
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList);
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).isEqualTo(newArrayList3);
    }

    @TestTemplate
    public void testAllValidFilesAreKept() throws IOException {
        Table create = TABLES.create(SCHEMA, SPEC, this.properties, this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(create.snapshots());
        List<String> snapshotFiles = snapshotFiles(((Snapshot) newArrayList.get(0)).snapshotId());
        Assertions.assertThat(snapshotFiles).hasSize(1);
        List<String> snapshotFiles2 = snapshotFiles(((Snapshot) newArrayList.get(1)).snapshotId());
        Assertions.assertThat(snapshotFiles2).hasSize(1);
        List<String> snapshotFiles3 = snapshotFiles(((Snapshot) newArrayList.get(2)).snapshotId());
        Assertions.assertThat(snapshotFiles3).hasSize(2);
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete 4 files", new Object[0]).hasSize(4);
        FileSystem fileSystem = new Path(this.tableLocation + "/data").getFileSystem(spark.sessionState().newHadoopConf());
        Iterator<String> it = snapshotFiles.iterator();
        while (it.hasNext()) {
            ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path(it.next()))).as("All snapshot files must remain", new Object[0])).isTrue();
        }
        Iterator<String> it2 = snapshotFiles2.iterator();
        while (it2.hasNext()) {
            ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path(it2.next()))).as("All snapshot files must remain", new Object[0])).isTrue();
        }
        Iterator<String> it3 = snapshotFiles3.iterator();
        while (it3.hasNext()) {
            ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path(it3.next()))).as("All snapshot files must remain", new Object[0])).isTrue();
        }
    }

    @TestTemplate
    public void orphanedFileRemovedWithParallelTasks() {
        Table create = TABLES.create(SCHEMA, SPEC, this.properties, this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("overwrite").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.coalesce(1).write().mode("append").parquet(this.tableLocation + "/data/invalid/invalid");
        waitUntilAfter(System.currentTimeMillis());
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        SparkActions.get().deleteOrphanFiles(create).executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-orphan-" + atomicInteger.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        })).olderThan(System.currentTimeMillis() + 5000).deleteWith(str -> {
            newKeySet2.add(Thread.currentThread().getName());
            newKeySet.add(str);
        }).execute();
        Assertions.assertThat(newKeySet2).isEqualTo(Sets.newHashSet(new String[]{"remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3"}));
        Assertions.assertThat(newKeySet).hasSize(4);
    }

    @TestTemplate
    public void testWapFilesAreKept() {
        Assumptions.assumeThat(this.formatVersion).as("currently fails with DVs", new Object[0]).isEqualTo(2);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.wap.enabled", "true");
        newHashMap.putAll(this.properties);
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, ThreeColumnRecord.class);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        spark.conf().set("spark.wap.id", "1");
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).as("Should not return data from the staged snapshot", new Object[0]).isEqualTo(newArrayList);
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should not delete any files", new Object[0]).isEmpty();
    }

    @TestTemplate
    public void testMetadataFolderIsIntact() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.data.path", this.tableLocation);
        newHashMap.putAll(this.properties);
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/c2_trunc=AA/c3=AAAA");
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete 1 file", new Object[0]).hasSize(1);
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).as("Rows must match", new Object[0]).isEqualTo(newArrayList);
    }

    @TestTemplate
    public void testOlderThanTimestamp() {
        Table create = TABLES.create(SCHEMA, SPEC, this.properties, this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        waitUntilAfter(System.currentTimeMillis());
        long currentTimeMillis = System.currentTimeMillis();
        waitUntilAfter(System.currentTimeMillis() + 1000);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/c2_trunc=AA/c3=AAAA");
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(currentTimeMillis).execute().orphanFileLocations()).as("Should delete only 2 files", new Object[0]).hasSize(2);
    }

    @TestTemplate
    public void testRemoveUnreachableMetadataVersionFiles() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.data.path", this.tableLocation);
        newHashMap.put("write.metadata.previous-versions-max", "1");
        newHashMap.putAll(this.properties);
        Table create = TABLES.create(SCHEMA, SPEC, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, ThreeColumnRecord.class);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        createDataFrame.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        waitUntilAfter(System.currentTimeMillis());
        DeleteOrphanFiles.Result execute = SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute();
        Assertions.assertThat(execute.orphanFileLocations()).as("Should delete 1 file", new Object[0]).hasSize(1);
        Assertions.assertThat(StreamSupport.stream(execute.orphanFileLocations().spliterator(), false)).as("Should remove v1 file", new Object[0]).anyMatch(str -> {
            return str.contains("v1.metadata.json");
        });
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.addAll(newArrayList);
        newArrayList2.addAll(newArrayList);
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).as("Rows must match", new Object[0]).isEqualTo(newArrayList2);
    }

    @TestTemplate
    public void testManyTopLevelPartitions() {
        Table create = TABLES.create(SCHEMA, SPEC, this.properties, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add(new ThreeColumnRecord(Integer.valueOf(i), String.valueOf(i), String.valueOf(i)));
        }
        spark.createDataFrame(newArrayList, ThreeColumnRecord.class).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should not delete any files", new Object[0]).isEmpty();
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).count()).as("Rows count must match", new Object[0]).isEqualTo(newArrayList.size());
    }

    @TestTemplate
    public void testManyLeafPartitions() {
        Table create = TABLES.create(SCHEMA, SPEC, this.properties, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add(new ThreeColumnRecord(Integer.valueOf(i), String.valueOf(i % 3), String.valueOf(i)));
        }
        spark.createDataFrame(newArrayList, ThreeColumnRecord.class).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should not delete any files", new Object[0]).isEmpty();
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).count()).as("Row count must match", new Object[0]).isEqualTo(newArrayList.size());
    }

    @TestTemplate
    public void testHiddenPartitionPaths() {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "_c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
        Table create = TABLES.create(schema, PartitionSpec.builderFor(schema).truncate("_c2", 2).identity("c3").build(), this.properties, this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new Row[]{RowFactory.create(new Object[]{1, "AAAAAAAAAA", "AAAA"})}), new StructType().add("c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType)).coalesce(1);
        coalesce.select("c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete 2 files", new Object[0]).hasSize(2);
    }

    @TestTemplate
    public void testHiddenPartitionPathsWithPartitionEvolution() {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "_c1", Types.IntegerType.get()), Types.NestedField.optional(2, "_c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
        Table create = TABLES.create(schema, PartitionSpec.builderFor(schema).truncate("_c2", 2).build(), this.properties, this.tableLocation);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new Row[]{RowFactory.create(new Object[]{1, "AAAAAAAAAA", "AAAA"})}), new StructType().add("_c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType)).coalesce(1);
        coalesce.select("_c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA");
        create.updateSpec().addField("_c1").commit();
        coalesce.write().mode("append").parquet(this.tableLocation + "/data/_c2_trunc=AA/_c1=1");
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete 2 files", new Object[0]).hasSize(2);
    }

    @TestTemplate
    public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOException {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "_c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
        Table create = TABLES.create(schema, PartitionSpec.builderFor(schema).truncate("_c2", 2).identity("c3").build(), this.properties, this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new Row[]{RowFactory.create(new Object[]{1, "AAAAAAAAAA", "AAAA"})}), new StructType().add("c1", DataTypes.IntegerType).add("_c2", DataTypes.StringType).add("c3", DataTypes.StringType)).coalesce(1).select("c1", new String[]{"_c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        Path path2 = new Path(path, "_c2_trunc/file.txt");
        fileSystem.createNewFile(path2);
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete 0 files", new Object[0]).isEmpty();
        Assertions.assertThat(fileSystem.exists(path2)).isTrue();
    }

    private List<String> snapshotFiles(long j) {
        return spark.read().format("iceberg").option("snapshot-id", j).load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
    }

    @TestTemplate
    public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), this.properties, this.tableDir.getAbsolutePath());
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableDir.getAbsolutePath());
        List collectAsList = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assertions.assertThat(collectAsList).as("Should be 1 valid file", new Object[0]).hasSize(1);
        String str = (String) collectAsList.get(0);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data");
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        List list = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return fileStatus.getPath().toString();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).as("Should be 2 files", new Object[0]).hasSize(2);
        ArrayList newArrayList = Lists.newArrayList(list);
        newArrayList.removeIf(str2 -> {
            return str2.contains(str);
        });
        Assertions.assertThat(newArrayList).as("Should be 1 invalid file", new Object[0]).hasSize(1);
        waitUntilAfter(System.currentTimeMillis());
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis()).deleteWith(str3 -> {
        }).execute().orphanFileLocations()).as("Action should find 1 file", new Object[0]).isEqualTo(newArrayList);
        ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path((String) newArrayList.get(0)))).as("Invalid file should be present", new Object[0])).isTrue();
    }

    @TestTemplate
    public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException {
        Table createTable = new HadoopCatalog(new Configuration(), this.tableLocation).createTable(TableIdentifier.of(Namespace.of(new String[]{"testDb"}), "testTb"), SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap());
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(createTable.location());
        coalesce.write().mode("append").parquet(createTable.location() + "/data");
        waitUntilAfter(System.currentTimeMillis());
        createTable.refresh();
        Assertions.assertThat(SparkActions.get().deleteOrphanFiles(createTable).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Should delete only 1 file", new Object[0]).hasSize(1);
        Assertions.assertThat(spark.read().format("iceberg").load(createTable.location()).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).as("Rows must match", new Object[0]).isEqualTo(newArrayList);
    }

    @TestTemplate
    public void testHiveCatalogTable() throws IOException {
        TableIdentifier of = TableIdentifier.of(new String[]{"default", randomName("hivetestorphan")});
        Table createTable = catalog.createTable(of, SCHEMA, SPEC, this.tableLocation, this.properties);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(of.toString());
        String replaceFirst = createTable.location().replaceFirst("file:", "");
        new File(replaceFirst + "/data/trashfile").createNewFile();
        Assertions.assertThat(StreamSupport.stream(SparkActions.get().deleteOrphanFiles(createTable).olderThan(System.currentTimeMillis() + 1000).execute().orphanFileLocations().spliterator(), false)).as("trash file should be removed", new Object[0]).anyMatch(str -> {
            return str.contains("file:" + replaceFirst + "/data/trashfile");
        });
    }

    @TestTemplate
    public void testGarbageCollectionDisabled() {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), this.properties, this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        create.updateProperties().set("gc.enabled", "false").commit();
        Assertions.assertThatThrownBy(() -> {
            SparkActions.get().deleteOrphanFiles(create).execute();
        }).isInstanceOf(ValidationException.class).hasMessage("Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
    }

    @TestTemplate
    public void testCompareToFileList() throws IOException {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), this.properties, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset coalesce = spark.createDataFrame(newArrayList, ThreeColumnRecord.class).coalesce(1);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        Path path = new Path(this.tableLocation + "/data");
        FileSystem fileSystem = path.getFileSystem(spark.sessionState().newHadoopConf());
        List list = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return new FilePathLastModifiedRecord(fileStatus.getPath().toString(), new Timestamp(fileStatus.getModificationTime()));
        }).collect(Collectors.toList());
        Assertions.assertThat(list).as("Should be 2 valid files", new Object[0]).hasSize(2);
        coalesce.write().mode("append").parquet(this.tableLocation + "/data");
        List list2 = (List) Arrays.stream(fileSystem.listStatus(path, HiddenPathFilter.get())).filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus2 -> {
            return new FilePathLastModifiedRecord(fileStatus2.getPath().toString(), new Timestamp(fileStatus2.getModificationTime()));
        }).collect(Collectors.toList());
        Assertions.assertThat(list2).as("Should be 3 files", new Object[0]).hasSize(3);
        ArrayList newArrayList2 = Lists.newArrayList(list2);
        newArrayList2.removeAll(list);
        List list3 = (List) newArrayList2.stream().map((v0) -> {
            return v0.getFilePath();
        }).collect(Collectors.toList());
        Assertions.assertThat(newArrayList2).as("Should be 1 invalid file", new Object[0]).hasSize(1);
        waitUntilAfter(System.currentTimeMillis());
        SparkActions sparkActions = SparkActions.get();
        Dataset withColumnRenamed = spark.createDataFrame(list2, FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified");
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).compareToFileList(withColumnRenamed).deleteWith(str -> {
        }).execute().orphanFileLocations()).as("Default olderThan interval should be safe", new Object[0]).isEmpty();
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).compareToFileList(withColumnRenamed).olderThan(System.currentTimeMillis()).deleteWith(str2 -> {
        }).execute().orphanFileLocations()).as("Action should find 1 file", new Object[0]).isEqualTo(list3);
        ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path((String) list3.get(0)))).as("Invalid file should be present", new Object[0])).isTrue();
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).compareToFileList(withColumnRenamed).olderThan(System.currentTimeMillis()).execute().orphanFileLocations()).as("Action should delete 1 file", new Object[0]).isEqualTo(list3);
        ((AbstractBooleanAssert) Assertions.assertThat(fileSystem.exists(new Path((String) list3.get(0)))).as("Invalid file should not be present", new Object[0])).isFalse();
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList);
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList()).as("Rows must match", new Object[0]).isEqualTo(newArrayList3);
        Assertions.assertThat(sparkActions.deleteOrphanFiles(create).compareToFileList(spark.createDataFrame(Lists.newArrayList(new FilePathLastModifiedRecord[]{new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L))}), FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified")).deleteWith(str3 -> {
        }).execute().orphanFileLocations()).as("Action should find nothing", new Object[0]).isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.spark.TestBase
    public long waitUntilAfter(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long j2 = currentTimeMillis;
            if (j2 > j) {
                return j2;
            }
            currentTimeMillis = System.currentTimeMillis();
        }
    }

    @TestTemplate
    public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
        Assumptions.assumeThat(this.formatVersion).isGreaterThanOrEqualTo(2);
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), this.properties, this.tableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
        create.refresh();
        long snapshotId = create.currentSnapshot().snapshotId();
        long sequenceNumber = create.currentSnapshot().sequenceNumber();
        File file = new File(new URI(this.tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile();
        PuffinWriter build = Puffin.write(Files.localOutput(file)).build();
        try {
            build.add(new Blob("some-blob-type", ImmutableList.of(1), snapshotId, sequenceNumber, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
            build.finish();
            GenericStatisticsFile genericStatisticsFile = new GenericStatisticsFile(snapshotId, file.toString(), build.fileSize(), build.footerSize(), (List) build.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
            if (build != null) {
                build.close();
            }
            Transaction newTransaction = create.newTransaction();
            newTransaction.updateStatistics().setStatistics(snapshotId, genericStatisticsFile).commit();
            newTransaction.commitTransaction();
            SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis() + 1000).execute();
            ((AbstractFileAssert) Assertions.assertThat(file).as("stats file should exist", new Object[0])).exists();
            Assertions.assertThat(file.length()).as("stats file length", new Object[0]).isEqualTo(genericStatisticsFile.fileSizeInBytes());
            Transaction newTransaction2 = create.newTransaction();
            newTransaction2.updateStatistics().removeStatistics(genericStatisticsFile.snapshotId()).commit();
            newTransaction2.commitTransaction();
            Iterable orphanFileLocations = SparkActions.get().deleteOrphanFiles(create).olderThan(System.currentTimeMillis() + 1000).execute().orphanFileLocations();
            Assertions.assertThat(orphanFileLocations).as("Should be orphan file", new Object[0]).hasSize(1);
            ((AbstractStringAssert) Assertions.assertThat((String) Iterables.getOnlyElement(orphanFileLocations)).as("Deleted file", new Object[0])).isEqualTo(file.toURI().toString());
            ((AbstractBooleanAssert) Assertions.assertThat(file.exists()).as("stats file should be deleted", new Object[0])).isFalse();
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    public void testPathsWithExtraSlashes() {
        executeTest(Lists.newArrayList(new String[]{"file:///dir1/dir2/file1"}), Lists.newArrayList(new String[]{"file:///dir1/////dir2///file1"}), Lists.newArrayList());
    }

    @TestTemplate
    public void testPathsWithValidFileHavingNoAuthority() {
        executeTest(Lists.newArrayList(new String[]{"hdfs:///dir1/dir2/file1"}), Lists.newArrayList(new String[]{"hdfs://servicename/dir1/dir2/file1"}), Lists.newArrayList());
    }

    @TestTemplate
    public void testPathsWithActualFileHavingNoAuthority() {
        executeTest(Lists.newArrayList(new String[]{"hdfs://servicename/dir1/dir2/file1"}), Lists.newArrayList(new String[]{"hdfs:///dir1/dir2/file1"}), Lists.newArrayList());
    }

    @TestTemplate
    public void testPathsWithEqualSchemes() {
        ArrayList newArrayList = Lists.newArrayList(new String[]{"scheme1://bucket1/dir1/dir2/file1"});
        ArrayList newArrayList2 = Lists.newArrayList(new String[]{"scheme2://bucket1/dir1/dir2/file1"});
        Assertions.assertThatThrownBy(() -> {
            executeTest(newArrayList, newArrayList2, Lists.newArrayList(), ImmutableMap.of(), ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR);
        }).isInstanceOf(ValidationException.class).hasMessageStartingWith("Unable to determine whether certain files are orphan").hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)].");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("scheme1", "scheme");
        newHashMap.put("scheme2", "scheme");
        executeTest(newArrayList, newArrayList2, Lists.newArrayList(), newHashMap, ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR);
    }

    @TestTemplate
    public void testPathsWithEqualAuthorities() {
        ArrayList newArrayList = Lists.newArrayList(new String[]{"hdfs://servicename1/dir1/dir2/file1"});
        ArrayList newArrayList2 = Lists.newArrayList(new String[]{"hdfs://servicename2/dir1/dir2/file1"});
        Assertions.assertThatThrownBy(() -> {
            executeTest(newArrayList, newArrayList2, Lists.newArrayList(), ImmutableMap.of(), ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.ERROR);
        }).isInstanceOf(ValidationException.class).hasMessageStartingWith("Unable to determine whether certain files are orphan").hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)].");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("servicename1", "servicename");
        newHashMap.put("servicename2", "servicename");
        executeTest(newArrayList, newArrayList2, Lists.newArrayList(), ImmutableMap.of(), newHashMap, DeleteOrphanFiles.PrefixMismatchMode.ERROR);
    }

    @TestTemplate
    public void testRemoveOrphanFileActionWithDeleteMode() {
        executeTest(Lists.newArrayList(new String[]{"hdfs://servicename1/dir1/dir2/file1"}), Lists.newArrayList(new String[]{"hdfs://servicename2/dir1/dir2/file1"}), Lists.newArrayList(new String[]{"hdfs://servicename2/dir1/dir2/file1"}), ImmutableMap.of(), ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.DELETE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String randomName(String str) {
        return str + UUID.randomUUID().toString().replace("-", "");
    }

    private void executeTest(List<String> list, List<String> list2, List<String> list3) {
        executeTest(list, list2, list3, ImmutableMap.of(), ImmutableMap.of(), DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
    }

    private void executeTest(List<String> list, List<String> list2, List<String> list3, Map<String, String> map, Map<String, String> map2, DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode) {
        DeleteOrphanFilesSparkAction.StringToFileURI stringToFileURI = new DeleteOrphanFilesSparkAction.StringToFileURI(map, map2);
        Dataset createDataset = spark.createDataset(list, Encoders.STRING());
        Assertions.assertThat(DeleteOrphanFilesSparkAction.findOrphanFiles(spark, stringToFileURI.apply(spark.createDataset(list2, Encoders.STRING())), stringToFileURI.apply(createDataset), prefixMismatchMode)).isEqualTo(list3);
    }
}
