package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.class */
public class TestExpireSnapshotsProcedure extends ExtensionsTestBase {
    @AfterEach
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @TestTemplate
    public void testExpireSnapshotsInEmptyTable() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        assertEquals("Should not delete any files", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 0L, 0L})), sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent}));
    }

    @TestTemplate
    public void testExpireSnapshotsUsingPositionalArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Timestamp from = Timestamp.from(Instant.ofEpochMilli(loadTable.currentSnapshot().timestampMillis()));
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, from}));
        loadTable.refresh();
        Assertions.assertThat(loadTable.snapshots()).as("Should expire one snapshot", new Object[0]).hasSize(1);
        sql("INSERT OVERWRITE %s VALUES (3, 'c')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{3L, "c"}), row(new Object[]{4L, "d"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
        loadTable.refresh();
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        Timestamp from2 = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        Assertions.assertThat(loadTable.snapshots()).as("Should be 3 snapshots", new Object[0]).hasSize(3);
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{2L, 0L, 0L, 2L, 1L, 0L})), sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)", new Object[]{this.catalogName, this.tableIdent, from2}));
    }

    @TestTemplate
    public void testExpireSnapshotUsingNamedArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
    }

    @TestTemplate
    public void testExpireSnapshotsGCDisabled() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'false')", new Object[]{this.tableName, "gc.enabled"});
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(ValidationException.class).hasMessageStartingWith("Cannot expire snapshots: GC is disabled");
    }

    @TestTemplate
    public void testInvalidExpireSnapshotsCases() {
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots('n', table => 't')", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessage("Named and positional arguments cannot be mixed");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.custom.expire_snapshots('n', 't')", new Object[]{this.catalogName});
        }).isInstanceOf(NoSuchProcedureException.class).hasMessage("Procedure custom.expire_snapshots not found");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots()", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessage("Missing required parameters: [table]");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots('n', 2.2)", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessage("Wrong arg type for older_than: cannot cast DecimalType(2,1) to TimestampType");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots('')", new Object[]{this.catalogName});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot handle an empty identifier for argument table");
    }

    @TestTemplate
    public void testResolvingTableInAnotherCatalog() throws IOException {
        String str = "another_" + this.catalogName;
        spark.conf().set("spark.sql.catalog." + str, SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog." + str + ".type", "hadoop");
        spark.conf().set("spark.sql.catalog." + str + ".warehouse", Files.createTempDirectory(this.temp, "junit", new FileAttribute[0]).toFile().toURI().toString());
        sql("CREATE TABLE %s.%s (id bigint NOT NULL, data string) USING iceberg", new Object[]{str, this.tableIdent});
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, str + "." + this.tableName});
        }).isInstanceOf(IllegalArgumentException.class).hasMessageStartingWith("Cannot run procedure in catalog");
    }

    @TestTemplate
    public void testConcurrentExpireSnapshots() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (3, 'c')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        assertEquals("Expiring snapshots concurrently should succeed", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 3L, 0L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',max_concurrent_deletes => %s)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent, 4}));
    }

    @TestTemplate
    public void testConcurrentExpireSnapshotsWithInvalidInput() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, 0});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("max_concurrent_deletes should have value > 0, value: 0");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, -1});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("max_concurrent_deletes should have value > 0, value: -1");
    }

    @TestTemplate
    public void testExpireDeleteFiles() throws Exception {
        sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read')", new Object[]{this.tableName});
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "d")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        sql("DELETE FROM %s WHERE id=1", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(TestHelpers.deleteManifests(loadTable)).as("Should have 1 delete manifest", new Object[0]).hasSize(1);
        Assertions.assertThat(TestHelpers.deleteFiles(loadTable)).as("Should have 1 delete file", new Object[0]).hasSize(1);
        Path path = new Path(((ManifestFile) TestHelpers.deleteManifests(loadTable).iterator().next()).path());
        Path path2 = new Path(String.valueOf(((DeleteFile) TestHelpers.deleteFiles(loadTable).iterator().next()).path()));
        sql("CALL %s.system.rewrite_data_files(table => '%s',options => map('delete-file-threshold','1','use-starting-sequence-number', 'false'))", new Object[]{this.catalogName, this.tableIdent});
        loadTable.refresh();
        sql("INSERT INTO TABLE %s VALUES (5, 'e')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (6, 'f')", new Object[]{this.tableName});
        loadTable.refresh();
        Assertions.assertThat(TestHelpers.deleteManifests(loadTable)).as("Should have no delete manifests", new Object[0]).hasSize(0);
        Assertions.assertThat(TestHelpers.deleteFiles(loadTable)).as("Should have no delete files", new Object[0]).hasSize(0);
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        ((AbstractBooleanAssert) Assertions.assertThat(local.exists(path)).as("Delete manifest should still exist", new Object[0])).isTrue();
        ((AbstractBooleanAssert) Assertions.assertThat(local.exists(path2)).as("Delete file should still exist", new Object[0])).isTrue();
        assertEquals("Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)", ImmutableList.of(row(new Object[]{1L, 1L, 0L, 4L, 4L, 0L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
        ((AbstractBooleanAssert) Assertions.assertThat(local.exists(path)).as("Delete manifest should be removed", new Object[0])).isFalse();
        ((AbstractBooleanAssert) Assertions.assertThat(local.exists(path2)).as("Delete file should be removed", new Object[0])).isFalse();
    }

    @TestTemplate
    public void testExpireSnapshotWithStreamResultsEnabled() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',stream_results => true)", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
    }

    @TestTemplate
    public void testExpireSnapshotsWithSnapshotId() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        long longValue = loadTable.currentSnapshot().parentId().longValue();
        sql("CALL %s.system.expire_snapshots(table => '%s',snapshot_ids => ARRAY(%d))", new Object[]{this.catalogName, this.tableIdent, Long.valueOf(longValue)});
        loadTable.refresh();
        Assertions.assertThat(loadTable.snapshots()).as("Should be 1 snapshots", new Object[0]).hasSize(1);
        Assertions.assertThat(loadTable.snapshots()).as("Snapshot ID should not be present", new Object[0]).filteredOn(snapshot -> {
            return snapshot.snapshotId() == longValue;
        }).hasSize(0);
    }

    @TestTemplate
    public void testExpireSnapshotShouldFailForCurrentSnapshot() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.expire_snapshots(table => '%s',snapshot_ids => ARRAY(%d, %d))", new Object[]{this.catalogName, this.tableIdent, Long.valueOf(loadTable.currentSnapshot().snapshotId()), loadTable.currentSnapshot().parentId()});
        }).isInstanceOf(IllegalArgumentException.class).hasMessageStartingWith("Cannot expire");
    }

    @TestTemplate
    public void testExpireSnapshotsProcedureWorksWithSqlComments() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assertions.assertThat(loadTable.snapshots()).as("Should be 2 snapshots", new Object[0]).hasSize(2);
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), sql("/* CALL statement is used to expire snapshots */\n-- And we have single line comments as well \n/* And comments that span *multiple* \n lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots(   older_than => TIMESTAMP '%s',   table => '%s')", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}));
        loadTable.refresh();
        Assertions.assertThat(loadTable.snapshots()).as("Should be 1 snapshot remaining", new Object[0]).hasSize(1);
    }

    @TestTemplate
    public void testExpireSnapshotsWithStatisticFiles() throws Exception {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (10, 'abc')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        String statsFileLocation = ProcedureUtil.statsFileLocation(loadTable.location());
        StatisticsFile writeStatsFile = writeStatsFile(loadTable.currentSnapshot().snapshotId(), loadTable.currentSnapshot().sequenceNumber(), statsFileLocation, loadTable.io());
        loadTable.updateStatistics().setStatistics(writeStatsFile.snapshotId(), writeStatsFile).commit();
        sql("INSERT INTO %s SELECT 20, 'def'", new Object[]{this.tableName});
        loadTable.refresh();
        String statsFileLocation2 = ProcedureUtil.statsFileLocation(loadTable.location());
        StatisticsFile writeStatsFile2 = writeStatsFile(loadTable.currentSnapshot().snapshotId(), loadTable.currentSnapshot().sequenceNumber(), statsFileLocation2, loadTable.io());
        loadTable.updateStatistics().setStatistics(writeStatsFile2.snapshotId(), writeStatsFile2).commit();
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        Assertions.assertThat(((Object[]) sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}).get(0))[5]).as("should be 1 deleted statistics file", new Object[0]).isEqualTo(1L);
        loadTable.refresh();
        Assertions.assertThat(loadTable.statisticsFiles()).as("Statistics file entry in TableMetadata should be present only for the snapshot %s", new Object[]{Long.valueOf(writeStatsFile2.snapshotId())}).extracting((v0) -> {
            return v0.snapshotId();
        }).containsExactly(new Long[]{Long.valueOf(writeStatsFile2.snapshotId())});
        ((AbstractFileAssert) Assertions.assertThat(new File(statsFileLocation)).as("Statistics file should not exist for snapshot %s", new Object[]{Long.valueOf(writeStatsFile.snapshotId())})).doesNotExist();
        ((AbstractFileAssert) Assertions.assertThat(new File(statsFileLocation2)).as("Statistics file should exist for snapshot %s", new Object[]{Long.valueOf(writeStatsFile2.snapshotId())})).exists();
    }

    @TestTemplate
    public void testExpireSnapshotsWithPartitionStatisticFiles() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (10, 'abc')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        String statsFileLocation = ProcedureUtil.statsFileLocation(loadTable.location());
        PartitionStatisticsFile writePartitionStatsFile = ProcedureUtil.writePartitionStatsFile(loadTable.currentSnapshot().snapshotId(), statsFileLocation, loadTable.io());
        loadTable.updatePartitionStatistics().setPartitionStatistics(writePartitionStatsFile).commit();
        sql("INSERT INTO %s SELECT 20, 'def'", new Object[]{this.tableName});
        loadTable.refresh();
        String statsFileLocation2 = ProcedureUtil.statsFileLocation(loadTable.location());
        PartitionStatisticsFile writePartitionStatsFile2 = ProcedureUtil.writePartitionStatsFile(loadTable.currentSnapshot().snapshotId(), statsFileLocation2, loadTable.io());
        loadTable.updatePartitionStatistics().setPartitionStatistics(writePartitionStatsFile2).commit();
        waitUntilAfter(loadTable.currentSnapshot().timestampMillis());
        Assertions.assertThat(((Object[]) sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())), this.tableIdent}).get(0))[5]).as("should be 1 deleted partition statistics file", new Object[0]).isEqualTo(1L);
        loadTable.refresh();
        Assertions.assertThat(loadTable.partitionStatisticsFiles()).as("partition statistics file entry in TableMetadata should be present only for the snapshot %s", new Object[]{Long.valueOf(writePartitionStatsFile2.snapshotId())}).extracting((v0) -> {
            return v0.snapshotId();
        }).containsExactly(new Long[]{Long.valueOf(writePartitionStatsFile2.snapshotId())});
        ((AbstractFileAssert) Assertions.assertThat(new File(statsFileLocation)).as("partition statistics file should not exist for snapshot %s", new Object[]{Long.valueOf(writePartitionStatsFile.snapshotId())})).doesNotExist();
        ((AbstractFileAssert) Assertions.assertThat(new File(statsFileLocation2)).as("partition statistics file should exist for snapshot %s", new Object[]{Long.valueOf(writePartitionStatsFile2.snapshotId())})).exists();
    }

    private static StatisticsFile writeStatsFile(long j, long j2, String str, FileIO fileIO) throws IOException {
        PuffinWriter build = Puffin.write(fileIO.newOutputFile(str)).build();
        try {
            build.add(new Blob("some-blob-type", ImmutableList.of(1), j, j2, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
            build.finish();
            GenericStatisticsFile genericStatisticsFile = new GenericStatisticsFile(j, str, build.fileSize(), build.footerSize(), (List) build.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
            if (build != null) {
                build.close();
            }
            return genericStatisticsFile;
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
