package org.apache.iceberg.spark.source;

import java.util.List;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestDataFrameWriterV2.class */
public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog {
    @Before
    public void createTable() {
        sql("CREATE TABLE %s (id bigint, data string) USING iceberg", this.tableName);
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @Test
    public void testMergeSchemaFailsWithoutWriterOption() throws Exception {
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", this.tableName, "write.spark.accept-any-schema");
        jsonToDF("id bigint, data string", "{ \"id\": 1, \"data\": \"a\" }", "{ \"id\": 2, \"data\": \"b\" }").writeTo(this.tableName).append();
        assertEquals("Should have initial 2-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a"), row(2L, "b")), sql("select * from %s order by id", this.tableName));
        Dataset<Row> jsonToDF = jsonToDF("id bigint, data string, new_col float", "{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }", "{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }");
        AssertHelpers.assertThrows("Should fail when merge-schema is not enabled on the writer", IllegalArgumentException.class, "Field new_col not found in source schema", () -> {
            try {
                jsonToDF.writeTo(this.tableName).append();
            } catch (NoSuchTableException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    @Test
    public void testMergeSchemaWithoutAcceptAnySchema() throws Exception {
        jsonToDF("id bigint, data string", "{ \"id\": 1, \"data\": \"a\" }", "{ \"id\": 2, \"data\": \"b\" }").writeTo(this.tableName).append();
        assertEquals("Should have initial 2-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a"), row(2L, "b")), sql("select * from %s order by id", this.tableName));
        Dataset<Row> jsonToDF = jsonToDF("id bigint, data string, new_col float", "{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }", "{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }");
        AssertHelpers.assertThrows("Should fail when accept-any-schema is not enabled on the table", AnalysisException.class, "too many data columns", () -> {
            try {
                jsonToDF.writeTo(this.tableName).option("merge-schema", "true").append();
            } catch (NoSuchTableException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    @Test
    public void testMergeSchemaSparkProperty() throws Exception {
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", this.tableName, "write.spark.accept-any-schema");
        jsonToDF("id bigint, data string", "{ \"id\": 1, \"data\": \"a\" }", "{ \"id\": 2, \"data\": \"b\" }").writeTo(this.tableName).append();
        assertEquals("Should have initial 2-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a"), row(2L, "b")), sql("select * from %s order by id", this.tableName));
        jsonToDF("id bigint, data string, new_col float", "{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }", "{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }").writeTo(this.tableName).option("mergeSchema", "true").append();
        assertEquals("Should have 3-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a", null), row(2L, "b", null), row(3L, "c", Float.valueOf(12.06f)), row(4L, "d", Float.valueOf(14.41f))), sql("select * from %s order by id", this.tableName));
    }

    @Test
    public void testMergeSchemaIcebergProperty() throws Exception {
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", this.tableName, "write.spark.accept-any-schema");
        jsonToDF("id bigint, data string", "{ \"id\": 1, \"data\": \"a\" }", "{ \"id\": 2, \"data\": \"b\" }").writeTo(this.tableName).append();
        assertEquals("Should have initial 2-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a"), row(2L, "b")), sql("select * from %s order by id", this.tableName));
        jsonToDF("id bigint, data string, new_col float", "{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }", "{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }").writeTo(this.tableName).option("merge-schema", "true").append();
        assertEquals("Should have 3-column rows", (List<Object[]>) ImmutableList.of(row(1L, "a", null), row(2L, "b", null), row(3L, "c", Float.valueOf(12.06f)), row(4L, "d", Float.valueOf(14.41f))), sql("select * from %s order by id", this.tableName));
    }
}
