package org.apache.iceberg.spark.extensions;

import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.class */
public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
    public TestExpireSnapshotsProcedure(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @Test
    public void testExpireSnapshotsInEmptyTable() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        assertEquals("Should not delete any files", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 0L})), sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent}));
    }

    @Test
    public void testExpireSnapshotsUsingPositionalArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Timestamp from = Timestamp.from(Instant.ofEpochMilli(loadTable.currentSnapshot().timestampMillis()));
        Assert.assertEquals("Should be 2 snapshots", 2L, Iterables.size(loadTable.snapshots()));
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L})), sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, from}));
        loadTable.refresh();
        Assert.assertEquals("Should expire one snapshot", 1L, Iterables.size(loadTable.snapshots()));
        sql("INSERT OVERWRITE %s VALUES (3, 'c')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{3L, "c"}), row(new Object[]{4L, "d"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
        loadTable.refresh();
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        Timestamp from2 = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        Assert.assertEquals("Should be 3 snapshots", 3L, Iterables.size(loadTable.snapshots()));
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{2L, 0L, 0L, 2L, 1L})), sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)", new Object[]{this.catalogName, this.tableIdent, from2}));
    }

    @Test
    public void testExpireSnapshotUsingNamedArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should be 2 snapshots", 2L, Iterables.size(loadTable.snapshots()));
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',retain_last => 1)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
    }

    @Test
    public void testExpireSnapshotsGCDisabled() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'false')", new Object[]{this.tableName, "gc.enabled"});
        AssertHelpers.assertThrows("Should reject call", ValidationException.class, "Cannot expire snapshots: GC is disabled", () -> {
            return sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent});
        });
    }

    @Test
    public void testInvalidExpireSnapshotsCases() {
        AssertHelpers.assertThrows("Should not allow mixed args", AnalysisException.class, "Named and positional arguments cannot be mixed", () -> {
            return sql("CALL %s.system.expire_snapshots('n', table => 't')", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces", NoSuchProcedureException.class, "not found", () -> {
            return sql("CALL %s.custom.expire_snapshots('n', 't')", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should reject calls without all required args", AnalysisException.class, "Missing required parameters", () -> {
            return sql("CALL %s.system.expire_snapshots()", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should reject calls with invalid arg types", AnalysisException.class, "Wrong arg type", () -> {
            return sql("CALL %s.system.expire_snapshots('n', 2.2)", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should reject calls with empty table identifier", IllegalArgumentException.class, "Cannot handle an empty identifier", () -> {
            return sql("CALL %s.system.expire_snapshots('')", new Object[]{this.catalogName});
        });
    }

    @Test
    public void testResolvingTableInAnotherCatalog() throws IOException {
        String str = "another_" + this.catalogName;
        spark.conf().set("spark.sql.catalog." + str, SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog." + str + ".type", "hadoop");
        spark.conf().set("spark.sql.catalog." + str + ".warehouse", "file:" + this.temp.newFolder().toString());
        sql("CREATE TABLE %s.%s (id bigint NOT NULL, data string) USING iceberg", new Object[]{str, this.tableIdent});
        AssertHelpers.assertThrows("Should reject calls for a table in another catalog", IllegalArgumentException.class, "Cannot run procedure in catalog", () -> {
            return sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, str + "." + this.tableName});
        });
    }

    @Test
    public void testConcurrentExpireSnapshots() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (3, 'c')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        assertEquals("Expiring snapshots concurrently should succeed", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 3L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',max_concurrent_deletes => %s,retain_last => 1)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent, 4}));
    }

    @Test
    public void testConcurrentExpireSnapshotsWithInvalidInput() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        AssertHelpers.assertThrows("Should throw an error when max_concurrent_deletes = 0", IllegalArgumentException.class, "max_concurrent_deletes should have value > 0", () -> {
            return sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, 0});
        });
        AssertHelpers.assertThrows("Should throw an error when max_concurrent_deletes < 0 ", IllegalArgumentException.class, "max_concurrent_deletes should have value > 0", () -> {
            return sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, -1});
        });
    }

    @Test
    public void testExpireDeleteFiles() throws Exception {
        sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read')", new Object[]{this.tableName});
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "d")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        sql("DELETE FROM %s WHERE id=1", new Object[]{this.tableName});
        Table loadIcebergTable = Spark3Util.loadIcebergTable(spark, this.tableName);
        Assert.assertEquals("Should have 1 delete manifest", 1L, TestHelpers.deleteManifests(loadIcebergTable).size());
        Assert.assertEquals("Should have 1 delete file", 1L, TestHelpers.deleteFiles(loadIcebergTable).size());
        Path path = new Path(((ManifestFile) TestHelpers.deleteManifests(loadIcebergTable).iterator().next()).path());
        Path path2 = new Path(String.valueOf(((DeleteFile) TestHelpers.deleteFiles(loadIcebergTable).iterator().next()).path()));
        sql("CALL %s.system.rewrite_data_files(table => '%s',options => map('delete-file-threshold','1','use-starting-sequence-number', 'false'))", new Object[]{this.catalogName, this.tableIdent});
        loadIcebergTable.refresh();
        sql("INSERT INTO TABLE %s VALUES (5, 'e')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (6, 'f')", new Object[]{this.tableName});
        loadIcebergTable.refresh();
        Assert.assertEquals("Should have no delete manifests", 0L, TestHelpers.deleteManifests(loadIcebergTable).size());
        Assert.assertEquals("Should have no delete files", 0L, TestHelpers.deleteFiles(loadIcebergTable).size());
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        Assert.assertTrue("Delete manifest should still exist", local.exists(path));
        Assert.assertTrue("Delete file should still exist", local.exists(path2));
        assertEquals("Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)", ImmutableList.of(row(new Object[]{1L, 1L, 0L, 4L, 4L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',retain_last => 1)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
        Assert.assertFalse("Delete manifest should be removed", local.exists(path));
        Assert.assertFalse("Delete file should be removed", local.exists(path2));
    }

    @Test
    public void testExpireSnapshotWithStreamResultsEnabled() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should be 2 snapshots", 2L, Iterables.size(loadTable.snapshots()));
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',retain_last => 1, stream_results => true)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
    }

    @Test
    public void testExpireSnapshotsProcedureWorksWithSqlComments() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should be 2 snapshots", 2L, Iterables.size(loadTable.snapshots()));
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L})), sql("/* CALL statement is used to expire snapshots */\n-- And we have single line comments as well \n/* And comments that span *multiple* \n lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots(   older_than => TIMESTAMP '%s',   table => '%s',   retain_last => 1)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
        loadTable.refresh();
        Assert.assertEquals("Should be 1 snapshot remaining", 1L, Iterables.size(loadTable.snapshots()));
    }
}
