package org.apache.iceberg.spark.extensions;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.class */
public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
    public TestPublishChangesProcedure(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @Test
    public void testApplyWapChangesUsingPositionalArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.conf().set("spark.wap.id", "wap_id_1");
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        assertEquals("Should not see rows from staged snapshot", ImmutableList.of(), sql("SELECT * FROM %s", new Object[]{this.tableName}));
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snapshot = (Snapshot) Iterables.getOnlyElement(loadTable.snapshots());
        List sql = sql("CALL %s.system.publish_changes('%s', '%s')", new Object[]{this.catalogName, this.tableIdent, "wap_id_1"});
        loadTable.refresh();
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{Long.valueOf(snapshot.snapshotId()), Long.valueOf(loadTable.currentSnapshot().snapshotId())})), sql);
        assertEquals("Apply of WAP changes must be successful", ImmutableList.of(row(new Object[]{1L, "a"})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @Test
    public void testApplyWapChangesUsingNamedArgs() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.conf().set("spark.wap.id", "wap_id_1");
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        assertEquals("Should not see rows from staged snapshot", ImmutableList.of(), sql("SELECT * FROM %s", new Object[]{this.tableName}));
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snapshot = (Snapshot) Iterables.getOnlyElement(loadTable.snapshots());
        List sql = sql("CALL %s.system.publish_changes(wap_id => '%s', table => '%s')", new Object[]{this.catalogName, "wap_id_1", this.tableIdent});
        loadTable.refresh();
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{Long.valueOf(snapshot.snapshotId()), Long.valueOf(loadTable.currentSnapshot().snapshotId())})), sql);
        assertEquals("Apply of WAP changes must be successful", ImmutableList.of(row(new Object[]{1L, "a"})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @Test
    public void testApplyWapChangesRefreshesRelationCache() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.sql("SELECT * FROM " + this.tableName + " WHERE id = 1").createOrReplaceTempView("tmp");
        spark.sql("CACHE TABLE tmp");
        assertEquals("View should not produce rows", ImmutableList.of(), sql("SELECT * FROM tmp", new Object[0]));
        spark.conf().set("spark.wap.id", "wap_id_1");
        sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        assertEquals("Should not see rows from staged snapshot", ImmutableList.of(), sql("SELECT * FROM %s", new Object[]{this.tableName}));
        sql("CALL %s.system.publish_changes('%s', '%s')", new Object[]{this.catalogName, this.tableIdent, "wap_id_1"});
        assertEquals("Apply of WAP changes should be visible", ImmutableList.of(row(new Object[]{1L, "a"})), sql("SELECT * FROM tmp", new Object[0]));
        sql("UNCACHE TABLE tmp", new Object[0]);
    }

    @Test
    public void testApplyInvalidWapId() {
        sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        AssertHelpers.assertThrows("Should reject invalid wap id", ValidationException.class, "Cannot apply unknown WAP ID", () -> {
            return sql("CALL %s.system.publish_changes('%s', 'not_valid')", new Object[]{this.catalogName, this.tableIdent});
        });
    }

    @Test
    public void testInvalidApplyWapChangesCases() {
        AssertHelpers.assertThrows("Should not allow mixed args", AnalysisException.class, "Named and positional arguments cannot be mixed", () -> {
            return sql("CALL %s.system.publish_changes('n', table => 't', 'not_valid')", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces", NoSuchProcedureException.class, "not found", () -> {
            return sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should reject calls without all required args", AnalysisException.class, "Missing required parameters", () -> {
            return sql("CALL %s.system.publish_changes('t')", new Object[]{this.catalogName});
        });
        AssertHelpers.assertThrows("Should reject calls with empty table identifier", IllegalArgumentException.class, "Cannot handle an empty identifier", () -> {
            return sql("CALL %s.system.publish_changes('', 'not_valid')", new Object[]{this.catalogName});
        });
    }
}
