package org.apache.iceberg.spark.actions;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.DeleteReachableFiles;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.class */
public class TestDeleteReachableFilesAction extends SparkTestBase {

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private Table table;
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    private static final int SHUFFLE_PARTITIONS = 2;
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(SHUFFLE_PARTITIONS, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
    static final DataFile FILE_A = DataFiles.builder(SPEC).withPath("/path/to/data-a.parquet").withFileSizeInBytes(10).withPartition(TestHelpers.Row.of(new Object[]{0})).withRecordCount(1).build();
    static final DataFile FILE_B = DataFiles.builder(SPEC).withPath("/path/to/data-b.parquet").withFileSizeInBytes(10).withPartition(TestHelpers.Row.of(new Object[]{1})).withRecordCount(1).build();
    static final DataFile FILE_C = DataFiles.builder(SPEC).withPath("/path/to/data-c.parquet").withFileSizeInBytes(10).withPartition(TestHelpers.Row.of(new Object[]{Integer.valueOf(SHUFFLE_PARTITIONS)})).withRecordCount(1).build();
    static final DataFile FILE_D = DataFiles.builder(SPEC).withPath("/path/to/data-d.parquet").withFileSizeInBytes(10).withPartition(TestHelpers.Row.of(new Object[]{3})).withRecordCount(1).build();

    @Before
    public void setupTableLocation() throws Exception {
        this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), this.temp.newFolder().toURI().toString());
        spark.conf().set("spark.sql.shuffle.partitions", 2L);
    }

    private void checkRemoveFilesResults(long j, long j2, long j3, long j4, DeleteReachableFiles.Result result) {
        Assert.assertEquals("Incorrect number of manifest files deleted", j2, result.deletedManifestsCount());
        Assert.assertEquals("Incorrect number of datafiles deleted", j, result.deletedDataFilesCount());
        Assert.assertEquals("Incorrect number of manifest lists deleted", j3, result.deletedManifestListsCount());
        Assert.assertEquals("Incorrect number of other lists deleted", j4, result.deletedOtherFilesCount());
    }

    @Test
    public void dataFilesCleanupWithParallelTasks() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_B).commit();
        this.table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit();
        this.table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        DeleteReachableFiles.Result result = (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-files-" + atomicInteger.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        })).deleteWith(str -> {
            newKeySet2.add(Thread.currentThread().getName());
            newKeySet.add(str);
        }).execute();
        Assert.assertEquals(newKeySet2, Sets.newHashSet(new String[]{"remove-files-0", "remove-files-1", "remove-files-2", "remove-files-3"}));
        Lists.newArrayList(new DataFile[]{FILE_A, FILE_B, FILE_C, FILE_D}).forEach(dataFile -> {
            Assert.assertTrue("FILE_A should be deleted", newKeySet.contains(FILE_A.path().toString()));
        });
        checkRemoveFilesResults(4L, 6L, 4L, 6L, result);
    }

    @Test
    public void testWithExpiringDanglingStageCommit() {
        this.table.location();
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles) this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        checkRemoveFilesResults(3L, 3L, 3L, 5L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).execute());
    }

    @Test
    public void testRemoveFileActionOnEmptyTable() {
        checkRemoveFilesResults(0L, 0L, 0L, 2L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).execute());
    }

    @Test
    public void testRemoveFilesActionWithReducedVersionsTable() {
        this.table.updateProperties().set("write.metadata.previous-versions-max", "2").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        checkRemoveFilesResults(4L, 5L, 5L, 8L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).execute());
    }

    @Test
    public void testRemoveFilesAction() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        checkRemoveFilesResults(2L, 2L, 2L, 4L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).execute());
    }

    @Test
    public void testRemoveFilesActionWithDefaultIO() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        checkRemoveFilesResults(2L, 2L, 2L, 4L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).execute());
    }

    @Test
    public void testUseLocalIterator() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        int i = spark.sparkContext().dagScheduler().nextJobId().get();
        DeleteReachableFiles.Result result = (DeleteReachableFiles.Result) ((DeleteReachableFiles) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).option("stream-results", "true")).execute();
        int i2 = spark.sparkContext().dagScheduler().nextJobId().get() - i;
        checkRemoveFilesResults(3L, 4L, 3L, 5L, result);
        Assert.assertEquals("Expected total jobs to be equal to total number of shuffle partitions", i2, 2L);
    }

    @Test
    public void testIgnoreMetadataFilesNotFound() {
        this.table.updateProperties().set("write.metadata.previous-versions-max", "1").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        DeleteOrphanFiles.Result result = (DeleteOrphanFiles.Result) sparkActions().deleteOrphanFiles(this.table).olderThan(System.currentTimeMillis()).execute();
        Assert.assertEquals("Should delete 1 file", 1L, Iterables.size(result.orphanFileLocations()));
        Assert.assertTrue("Should remove v1 file", StreamSupport.stream(result.orphanFileLocations().spliterator(), false).anyMatch(str -> {
            return str.contains("v1.metadata.json");
        }));
        checkRemoveFilesResults(1L, 1L, 1L, 4L, (DeleteReachableFiles.Result) sparkActions().deleteReachableFiles(metadataLocation(this.table)).io(this.table.io()).execute());
    }

    @Test
    public void testEmptyIOThrowsException() {
        DeleteReachableFiles io = sparkActions().deleteReachableFiles(metadataLocation(this.table)).io((FileIO) null);
        io.getClass();
        AssertHelpers.assertThrows("FileIO needs to be set to use RemoveFiles action", IllegalArgumentException.class, "File IO cannot be null", io::execute);
    }

    @Test
    public void testRemoveFilesActionWhenGarbageCollectionDisabled() {
        this.table.updateProperties().set("gc.enabled", "false").commit();
        AssertHelpers.assertThrows("Should complain about removing files when GC is disabled", ValidationException.class, "Cannot remove files: GC is disabled (deleting files may corrupt other tables)", () -> {
            return sparkActions().deleteReachableFiles(metadataLocation(this.table));
        });
    }

    private String metadataLocation(Table table) {
        return ((HasTableOperations) table).operations().current().metadataFileLocation();
    }

    private ActionsProvider sparkActions() {
        return SparkActions.get();
    }
}
