package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ReplaceSortOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.MigrateTable;
import org.apache.iceberg.actions.SnapshotTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.types.Types;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;
import scala.Option;
import scala.Some;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestCreateActions.class */
public class TestCreateActions extends SparkCatalogTestBase {
    private static final String CREATE_PARTITIONED_PARQUET = "CREATE TABLE %s (id INT, data STRING) using parquet PARTITIONED BY (id) LOCATION '%s'";
    private static final String CREATE_PARQUET = "CREATE TABLE %s (id INT, data STRING) using parquet LOCATION '%s'";
    private static final String CREATE_HIVE_EXTERNAL_PARQUET = "CREATE EXTERNAL TABLE %s (data STRING) PARTITIONED BY (id INT) STORED AS parquet LOCATION '%s'";
    private static final String CREATE_HIVE_PARQUET = "CREATE TABLE %s (data STRING) PARTITIONED BY (id INT) STORED AS parquet";
    private static final String NAMESPACE = "default";

    @Rule
    public TemporaryFolder temp;
    private final String baseTableName = "baseTable";
    private File tableDir;
    private String tableLocation;
    private final String type;
    private final TableCatalog catalog;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"spark_catalog", SparkSessionCatalog.class.getName(), ImmutableMap.of("type", "hive", "default-namespace", NAMESPACE, "parquet-enabled", "true", "cache-enabled", "false")}, new Object[]{"spark_catalog", SparkSessionCatalog.class.getName(), ImmutableMap.of("type", "hadoop", "default-namespace", NAMESPACE, "parquet-enabled", "true", "cache-enabled", "false")}, new Object[]{"testhive", SparkCatalog.class.getName(), ImmutableMap.of("type", "hive", "default-namespace", NAMESPACE)}, new Object[]{"testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop", "default-namespace", NAMESPACE)}};
    }

    public TestCreateActions(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
        this.temp = new TemporaryFolder();
        this.baseTableName = "baseTable";
        this.catalog = spark.sessionState().catalogManager().catalog(str);
        this.type = map.get("type");
    }

    @Before
    public void before() {
        try {
            this.tableDir = this.temp.newFolder();
            this.tableLocation = this.tableDir.toURI().toString();
            spark.conf().set("hive.exec.dynamic.partition", "true");
            spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
            spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
            spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
            spark.sql(String.format("DROP TABLE IF EXISTS %s", "baseTable"));
            spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class).select("id", new String[]{"data"}).orderBy("data", new String[0]).write().mode("append").option("path", this.tableLocation).saveAsTable("baseTable");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @After
    public void after() throws IOException {
        spark.sql(String.format("DROP TABLE IF EXISTS %s", "baseTable"));
        spark.sessionState().catalogManager().reset();
        spark.conf().unset("spark.sql.catalog.spark_catalog.type");
        spark.conf().unset("spark.sql.catalog.spark_catalog.default-namespace");
        spark.conf().unset("spark.sql.catalog.spark_catalog.parquet-enabled");
        spark.conf().unset("spark.sql.catalog.spark_catalog.cache-enabled");
    }

    @Test
    public void testMigratePartitioned() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_migrate_partitioned_table");
        createSourceTable(CREATE_PARTITIONED_PARQUET, sourceName);
        assertMigratedFileCount(SparkActions.get().migrateTable(sourceName), sourceName, sourceName);
    }

    @Test
    public void testPartitionedTableWithUnRecoveredPartitions() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_unrecovered_partitions");
        File newFolder = this.temp.newFolder();
        sql(CREATE_PARTITIONED_PARQUET, sourceName, newFolder);
        spark.range(5L).selectExpr(new String[]{"id", "cast(id as STRING) as data"}).write().partitionBy(new String[]{"id"}).mode(SaveMode.Overwrite).parquet(newFolder.toURI().toString());
        sql("ALTER TABLE %s ADD PARTITION(id=0)", sourceName);
        assertMigratedFileCount(SparkActions.get().migrateTable(sourceName), sourceName, sourceName);
    }

    @Test
    public void testPartitionedTableWithCustomPartitions() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_custom_parts");
        File newFolder = this.temp.newFolder();
        File newFolder2 = this.temp.newFolder();
        spark.sql(String.format(CREATE_PARTITIONED_PARQUET, sourceName, newFolder));
        spark.range(10L).selectExpr(new String[]{"cast(id as STRING) as data"}).write().mode(SaveMode.Overwrite).parquet(newFolder2.toURI().toString());
        sql("ALTER TABLE %s ADD PARTITION(id=0) LOCATION '%s'", sourceName, newFolder2.toURI().toString());
        assertMigratedFileCount(SparkActions.get().migrateTable(sourceName), sourceName, sourceName);
    }

    @Test
    public void testAddColumnOnMigratedTableAtEnd() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_add_column_migrated_table");
        createSourceTable(CREATE_PARQUET, sourceName);
        List<Object[]> sql = sql("select *, null from %s order by id", sourceName);
        List<Object[]> sql2 = sql("select *, null, null from %s order by id", sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        SparkTable loadTable = loadTable(sourceName);
        Table table = loadTable.table();
        Schema schema = table.schema();
        loadTable.table().updateSchema().addColumn("newCol1", Types.IntegerType.get()).commit();
        Schema schema2 = table.schema();
        Assert.assertNull(schema.findField("newCol1"));
        Assert.assertNotNull(schema2.findField("newCol1"));
        List<Object[]> sql3 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql3.isEmpty());
        assertEquals("Output must match", sql3, sql);
        sql("ALTER TABLE %s ADD COLUMN %s INT", sourceName, "newCol2");
        Assert.assertTrue(Arrays.asList(spark.table(sourceName).schema().fieldNames()).contains("newCol2"));
        List<Object[]> sql4 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql4.isEmpty());
        assertEquals("Output must match", sql4, sql2);
    }

    @Test
    public void testAddColumnOnMigratedTableAtMiddle() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_add_column_migrated_table_middle");
        createSourceTable(CREATE_PARQUET, sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        SparkTable loadTable = loadTable(sourceName);
        Table table = loadTable.table();
        List<Object[]> sql = sql("select id, null, data from %s order by id", sourceName);
        Schema schema = table.schema();
        loadTable.table().updateSchema().addColumn("newCol", Types.IntegerType.get()).moveAfter("newCol", "id").commit();
        Schema schema2 = table.schema();
        Assert.assertNull(schema.findField("newCol"));
        Assert.assertNotNull(schema2.findField("newCol"));
        List<Object[]> sql2 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql2, sql);
    }

    @Test
    public void removeColumnsAtEnd() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_remove_column_migrated_table");
        this.temp.newFolder();
        spark.range(10L).selectExpr(new String[]{"cast(id as INT)", "CAST(id as INT) " + "newCol1", "CAST(id as INT) " + "newCol2"}).write().mode(SaveMode.Overwrite).saveAsTable(sourceName);
        List<Object[]> sql = sql("select id, %s from %s order by id", "newCol1", sourceName);
        List<Object[]> sql2 = sql("select id from %s order by id", sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        SparkTable loadTable = loadTable(sourceName);
        Table table = loadTable.table();
        Schema schema = table.schema();
        loadTable.table().updateSchema().deleteColumn("newCol1").commit();
        Schema schema2 = table.schema();
        Assert.assertNotNull(schema.findField("newCol1"));
        Assert.assertNull(schema2.findField("newCol1"));
        List<Object[]> sql3 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql3.isEmpty());
        assertEquals("Output must match", sql, sql3);
        sql("ALTER TABLE %s DROP COLUMN %s", sourceName, "newCol2");
        Assert.assertFalse(Arrays.asList(spark.table(sourceName).schema().fieldNames()).contains("newCol2"));
        List<Object[]> sql4 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql4.isEmpty());
        assertEquals("Output must match", sql2, sql4);
    }

    @Test
    public void removeColumnFromMiddle() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_remove_column_migrated_table_from_middle");
        spark.range(10L).selectExpr(new String[]{"cast(id as INT)", "CAST(id as INT) as " + "col1", "CAST(id as INT) as col2"}).write().mode(SaveMode.Overwrite).saveAsTable(sourceName);
        List<Object[]> sql = sql("select id, col2 from %s order by id", sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        sql("ALTER TABLE %s DROP COLUMN %s", sourceName, "col1");
        Assert.assertFalse(Arrays.asList(spark.table(sourceName).schema().fieldNames()).contains("col1"));
        List<Object[]> sql2 = sql("select * from %s order by id", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    @Test
    public void testMigrateUnpartitioned() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("test_migrate_unpartitioned_table");
        createSourceTable(CREATE_PARQUET, sourceName);
        assertMigratedFileCount(SparkActions.get().migrateTable(sourceName), sourceName, sourceName);
    }

    @Test
    public void testSnapshotPartitioned() throws Exception {
        Assume.assumeTrue("Cannot snapshot with arbitrary location in a hadoop based catalog", !this.type.equals("hadoop"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("test_snapshot_partitioned_table");
        String destName = destName("iceberg_snapshot_partitioned");
        createSourceTable(CREATE_PARTITIONED_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableLocation(newFolder.toString()), sourceName, destName);
        assertIsolatedSnapshot(sourceName, destName);
    }

    @Test
    public void testSnapshotUnpartitioned() throws Exception {
        Assume.assumeTrue("Cannot snapshot with arbitrary location in a hadoop based catalog", !this.type.equals("hadoop"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("test_snapshot_unpartitioned_table");
        String destName = destName("iceberg_snapshot_unpartitioned");
        createSourceTable(CREATE_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableLocation(newFolder.toString()), sourceName, destName);
        assertIsolatedSnapshot(sourceName, destName);
    }

    @Test
    public void testSnapshotHiveTable() throws Exception {
        Assume.assumeTrue("Cannot snapshot with arbitrary location in a hadoop based catalog", !this.type.equals("hadoop"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("snapshot_hive_table");
        String destName = destName("iceberg_snapshot_hive_table");
        createSourceTable(CREATE_HIVE_EXTERNAL_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableLocation(newFolder.toString()), sourceName, destName);
        assertIsolatedSnapshot(sourceName, destName);
    }

    @Test
    public void testMigrateHiveTable() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        String sourceName = sourceName("migrate_hive_table");
        createSourceTable(CREATE_HIVE_EXTERNAL_PARQUET, sourceName);
        assertMigratedFileCount(SparkActions.get().migrateTable(sourceName), sourceName, sourceName);
    }

    @Test
    public void testSnapshotManagedHiveTable() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("snapshot_managed_hive_table");
        String destName = destName("iceberg_snapshot_managed_hive_table");
        createSourceTable(CREATE_HIVE_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableLocation(newFolder.toString()), sourceName, destName);
        assertIsolatedSnapshot(sourceName, destName);
    }

    @Test
    public void testMigrateManagedHiveTable() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("migrate_managed_hive_table");
        String destName = destName("iceberg_migrate_managed_hive_table");
        createSourceTable(CREATE_HIVE_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableLocation(newFolder.toString()), sourceName, destName);
    }

    @Test
    public void testProperties() throws Exception {
        String sourceName = sourceName("test_properties_table");
        String destName = destName("iceberg_properties");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("city", "New Orleans");
        newHashMap.put("note", "Jazz");
        createSourceTable(CREATE_PARQUET, sourceName);
        for (Map.Entry entry : newHashMap.entrySet()) {
            spark.sql(String.format("ALTER TABLE %s SET TBLPROPERTIES (\"%s\" = \"%s\")", sourceName, entry.getKey(), entry.getValue()));
        }
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName).tableProperty("dogs", "sundance"), sourceName, destName);
        SparkTable loadTable = loadTable(destName);
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.putAll(newHashMap);
        newHashMap2.put("dogs", "sundance");
        for (Map.Entry entry2 : newHashMap2.entrySet()) {
            Assert.assertTrue("Created table missing property " + ((String) entry2.getKey()), loadTable.properties().containsKey(entry2.getKey()));
            Assert.assertEquals("Property value is not the expected value", entry2.getValue(), loadTable.properties().get(entry2.getKey()));
        }
    }

    @Test
    public void testSparkTableReservedProperties() throws Exception {
        String sourceName = sourceName("test_reserved_properties_table");
        String destName = destName("iceberg_reserved_properties");
        createSourceTable(CREATE_PARQUET, sourceName);
        SnapshotTableSparkAction as = SparkActions.get().snapshotTable(sourceName).as(destName);
        as.tableProperty("format-version", "1");
        assertSnapshotFileCount(as, sourceName, destName);
        SparkTable loadTable = loadTable(destName);
        ((ReplaceSortOrder) ((ReplaceSortOrder) loadTable.table().replaceSortOrder().asc("id")).desc("data")).commit();
        for (String str : new String[]{"provider", "format", "current-snapshot-id", "location", "sort-order"}) {
            Assert.assertTrue("Created table missing reserved property " + str, loadTable.properties().containsKey(str));
        }
        Assert.assertEquals("Unexpected provider", "iceberg", loadTable.properties().get("provider"));
        Assert.assertEquals("Unexpected format", "iceberg/parquet", loadTable.properties().get("format"));
        Assert.assertNotEquals("No current-snapshot-id found", "none", loadTable.properties().get("current-snapshot-id"));
        Assert.assertTrue("Location isn't correct", ((String) loadTable.properties().get("location")).endsWith("iceberg_reserved_properties"));
        Assert.assertEquals("Unexpected format-version", "1", loadTable.properties().get("format-version"));
        loadTable.table().updateProperties().set("format-version", "2").commit();
        Assert.assertEquals("Unexpected format-version", "2", loadTable.properties().get("format-version"));
        Assert.assertEquals("Sort-order isn't correct", "id ASC NULLS FIRST, data DESC NULLS LAST", loadTable.properties().get("sort-order"));
        Assert.assertNull("Identifier fields should be null", loadTable.properties().get("identifier-fields"));
        loadTable.table().updateSchema().allowIncompatibleChanges().requireColumn("id").setIdentifierFields(new String[]{"id"}).commit();
        Assert.assertEquals("Identifier fields aren't correct", "[id]", loadTable.properties().get("identifier-fields"));
    }

    @Test
    public void testSnapshotDefaultLocation() throws Exception {
        String sourceName = sourceName("test_snapshot_default");
        String destName = destName("iceberg_snapshot_default");
        createSourceTable(CREATE_PARTITIONED_PARQUET, sourceName);
        assertSnapshotFileCount(SparkActions.get().snapshotTable(sourceName).as(destName), sourceName, destName);
        assertIsolatedSnapshot(sourceName, destName);
    }

    @Test
    public void schemaEvolutionTestWithSparkAPI() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        File newFolder = this.temp.newFolder();
        String sourceName = sourceName("schema_evolution_test");
        spark.range(0L, 5L).selectExpr(new String[]{"CAST(id as INT) as col0", "CAST(id AS FLOAT) col2", "CAST(id AS LONG) col3"}).write().mode(SaveMode.Append).parquet(newFolder.toURI().toString());
        Dataset selectExpr = spark.range(6L, 10L).selectExpr(new String[]{"CAST(id as INT) as col0", "CAST(id AS STRING) col1", "CAST(id AS FLOAT) col2", "CAST(id AS LONG) col3"});
        selectExpr.write().mode(SaveMode.Append).parquet(newFolder.toURI().toString());
        spark.read().schema(selectExpr.schema()).parquet(newFolder.toURI().toString()).write().saveAsTable(sourceName);
        List<Object[]> sql = sql("SELECT * FROM %s ORDER BY col0", sourceName);
        List<Object[]> sql2 = sql("SELECT col0, null, col1, col2, col3 FROM %s ORDER BY col0", sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        assertEquals("Output must match", sql, sql("SELECT * FROM %s ORDER BY col0", sourceName));
        loadTable(sourceName).table().updateSchema().addColumn("newCol", Types.IntegerType.get()).moveAfter("newCol", "col0").commit();
        assertEquals("Output must match", sql2, sql("SELECT * FROM %s ORDER BY col0", sourceName));
    }

    @Test
    public void schemaEvolutionTestWithSparkSQL() throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        Assume.assumeTrue("Can only migrate from Spark Session Catalog", this.catalog.name().equals("spark_catalog"));
        String sourceName = sourceName("schema_evolution_test_sql");
        spark.range(0L, 5L).selectExpr(new String[]{"CAST(id as INT) col0", "CAST(id AS FLOAT) col1", "CAST(id AS STRING) col2"}).write().mode(SaveMode.Append).saveAsTable(sourceName);
        sql("ALTER TABLE %s ADD COLUMN col3 INT", sourceName);
        spark.range(6L, 10L).selectExpr(new String[]{"CAST(id AS INT) col0", "CAST(id AS FLOAT) col1", "CAST(id AS STRING) col2", "CAST(id AS INT) col3"}).registerTempTable("tempdata");
        sql("INSERT INTO TABLE %s SELECT * FROM tempdata", sourceName);
        List<Object[]> sql = sql("SELECT * FROM %s ORDER BY col0", sourceName);
        List<Object[]> sql2 = sql("SELECT col0, null, col1, col2, col3 FROM %s ORDER BY col0", sourceName);
        SparkActions.get().migrateTable(sourceName).execute();
        assertEquals("Output must match", sql, sql("SELECT * FROM %s ORDER BY col0", sourceName));
        loadTable(sourceName).table().updateSchema().addColumn("newCol", Types.IntegerType.get()).moveAfter("newCol", "col0").commit();
        assertEquals("Output must match", sql2, sql("SELECT * FROM %s ORDER BY col0", sourceName));
    }

    @Test
    public void testHiveStyleThreeLevelList() throws Exception {
        threeLevelList(true);
    }

    @Test
    public void testThreeLevelList() throws Exception {
        threeLevelList(false);
    }

    @Test
    public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
        threeLevelListWithNestedStruct(true);
    }

    @Test
    public void testThreeLevelListWithNestedStruct() throws Exception {
        threeLevelListWithNestedStruct(false);
    }

    @Test
    public void testHiveStyleThreeLevelLists() throws Exception {
        threeLevelLists(true);
    }

    @Test
    public void testThreeLevelLists() throws Exception {
        threeLevelLists(false);
    }

    @Test
    public void testHiveStyleStructOfThreeLevelLists() throws Exception {
        structOfThreeLevelLists(true);
    }

    @Test
    public void testStructOfThreeLevelLists() throws Exception {
        structOfThreeLevelLists(false);
    }

    @Test
    public void testTwoLevelList() throws IOException {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        spark.conf().set("spark.sql.parquet.writeLegacyFormat", true);
        String sourceName = sourceName("testTwoLevelList");
        File newFolder = this.temp.newFolder();
        spark.read().schema(new StructType(new StructField[]{new StructField("col1", new ArrayType(new StructType(new StructField[]{new StructField("col2", DataTypes.IntegerType, false, Metadata.empty())}), false), true, Metadata.empty())})).json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(Collections.singletonList("{\"col1\": [{\"col2\": 1}]}"))).coalesce(1).write().format("parquet").mode(SaveMode.Append).save(newFolder.getPath());
        Assert.assertEquals(MessageTypeParser.parseMessageType("message spark_schema {\n  optional group col1 (LIST) {\n    repeated group array {\n      required int32 col2;\n    }\n  }\n}\n"), ParquetFileReader.open(HadoopInputFile.fromPath(new Path(((File) Arrays.stream((File[]) Preconditions.checkNotNull(newFolder.listFiles(new FilenameFilter() { // from class: org.apache.iceberg.spark.actions.TestCreateActions.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str) {
                return str.endsWith("parquet");
            }
        }))).findAny().get()).getPath()), spark.sessionState().newHadoopConf())).getFooter().getFileMetaData().getSchema());
        sql("CREATE EXTERNAL TABLE %s (col1 ARRAY<STRUCT<col2 INT>>) STORED AS parquet LOCATION '%s'", sourceName, newFolder);
        List<Object[]> sql = sql("select array(struct(1))", new Object[0]);
        SparkActions.get().migrateTable(sourceName).execute();
        List<Object[]> sql2 = sql("SELECT * FROM %s", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    private void threeLevelList(boolean z) throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        spark.conf().set("spark.sql.parquet.writeLegacyFormat", z);
        String sourceName = sourceName(String.format("threeLevelList_%s", Boolean.valueOf(z)));
        sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 INT>>) STORED AS parquet LOCATION '%s'", sourceName, this.temp.newFolder());
        sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", sourceName, 12345);
        List<Object[]> sql = sql(String.format("SELECT * FROM %s", sourceName), new Object[0]);
        SparkActions.get().migrateTable(sourceName).execute();
        List<Object[]> sql2 = sql("SELECT * FROM %s", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    private void threeLevelListWithNestedStruct(boolean z) throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        spark.conf().set("spark.sql.parquet.writeLegacyFormat", z);
        String sourceName = sourceName(String.format("threeLevelListWithNestedStruct_%s", Boolean.valueOf(z)));
        sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 STRUCT<col3 INT>>>) STORED AS parquet LOCATION '%s'", sourceName, this.temp.newFolder());
        sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", sourceName, 12345);
        List<Object[]> sql = sql(String.format("SELECT * FROM %s", sourceName), new Object[0]);
        SparkActions.get().migrateTable(sourceName).execute();
        List<Object[]> sql2 = sql("SELECT * FROM %s", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    private void threeLevelLists(boolean z) throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        spark.conf().set("spark.sql.parquet.writeLegacyFormat", z);
        String sourceName = sourceName(String.format("threeLevelLists_%s", Boolean.valueOf(z)));
        sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 INT>>, col3 ARRAY<STRUCT<col4 INT>>) STORED AS parquet LOCATION '%s'", sourceName, this.temp.newFolder());
        sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", sourceName, 12345, 987654);
        List<Object[]> sql = sql(String.format("SELECT * FROM %s", sourceName), new Object[0]);
        SparkActions.get().migrateTable(sourceName).execute();
        List<Object[]> sql2 = sql("SELECT * FROM %s", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    private void structOfThreeLevelLists(boolean z) throws Exception {
        Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !this.type.equals("hadoop"));
        spark.conf().set("spark.sql.parquet.writeLegacyFormat", z);
        String sourceName = sourceName(String.format("structOfThreeLevelLists_%s", Boolean.valueOf(z)));
        sql("CREATE TABLE %s (col1 STRUCT<col2 ARRAY<STRUCT<col3 INT>>>) STORED AS parquet LOCATION '%s'", sourceName, this.temp.newFolder());
        sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", sourceName, 12345);
        List<Object[]> sql = sql(String.format("SELECT * FROM %s", sourceName), new Object[0]);
        SparkActions.get().migrateTable(sourceName).execute();
        List<Object[]> sql2 = sql("SELECT * FROM %s", sourceName);
        Assert.assertFalse(sql2.isEmpty());
        assertEquals("Output must match", sql, sql2);
    }

    private SparkTable loadTable(String str) throws NoSuchTableException, ParseException {
        return this.catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, str).identifier());
    }

    private CatalogTable loadSessionTable(String str) throws NoSuchTableException, NoSuchDatabaseException, ParseException {
        Identifier identifier = Spark3Util.catalogAndIdentifier(spark, str).identifier();
        return spark.sessionState().catalog().getTableMetadata(new TableIdentifier(identifier.name(), Some.apply(identifier.namespace()[0])));
    }

    private void createSourceTable(String str, String str2) throws IOException, NoSuchTableException, NoSuchDatabaseException, ParseException {
        spark.sql(String.format(str, str2, this.temp.newFolder()));
        CatalogTable loadSessionTable = loadSessionTable(str2);
        spark.table("baseTable").write().mode(SaveMode.Append).format((String) loadSessionTable.provider().get()).partitionBy(loadSessionTable.partitionColumnNames().toSeq()).saveAsTable(str2);
    }

    private void assertMigratedFileCount(MigrateTable migrateTable, String str, String str2) throws NoSuchTableException, NoSuchDatabaseException, ParseException {
        long expectedFilesCount = expectedFilesCount(str);
        MigrateTable.Result result = (MigrateTable.Result) migrateTable.execute();
        validateTables(str, str2);
        Assert.assertEquals("Expected number of migrated files", expectedFilesCount, result.migratedDataFilesCount());
    }

    private void assertSnapshotFileCount(SnapshotTable snapshotTable, String str, String str2) throws NoSuchTableException, NoSuchDatabaseException, ParseException {
        long expectedFilesCount = expectedFilesCount(str);
        SnapshotTable.Result result = (SnapshotTable.Result) snapshotTable.execute();
        validateTables(str, str2);
        Assert.assertEquals("Expected number of imported snapshot files", expectedFilesCount, result.importedDataFilesCount());
    }

    private void validateTables(String str, String str2) throws NoSuchTableException, ParseException {
        List collectAsList = spark.table(str).collectAsList();
        Assert.assertEquals("Provider should be iceberg", "iceberg", loadTable(str2).properties().get("provider"));
        List collectAsList2 = spark.table(str2).collectAsList();
        Assert.assertTrue(String.format("Rows in migrated table did not match\nExpected :%s rows \nFound    :%s", collectAsList, collectAsList2), collectAsList.containsAll(collectAsList2) && collectAsList2.containsAll(collectAsList));
    }

    private long expectedFilesCount(String str) throws NoSuchDatabaseException, NoSuchTableException, ParseException {
        List list;
        CatalogTable loadSessionTable = loadSessionTable(str);
        if (loadSessionTable.partitionColumnNames().size() == 0) {
            list = Lists.newArrayList();
            list.add(loadSessionTable.location());
        } else {
            list = (List) JavaConverters.seqAsJavaList(spark.sessionState().catalog().listPartitions(loadSessionTable.identifier(), Option.apply((Object) null))).stream().map((v0) -> {
                return v0.location();
            }).collect(Collectors.toList());
        }
        return list.stream().flatMap(uri -> {
            return FileUtils.listFiles(Paths.get(uri).toFile(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).stream();
        }).filter(file -> {
            return (file.toString().endsWith("crc") || file.toString().contains("_SUCCESS")) ? false : true;
        }).count();
    }

    private void assertIsolatedSnapshot(String str, String str2) {
        List collectAsList = spark.sql(String.format("SELECT * FROM %s", str)).collectAsList();
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(4, "d")}), SimpleRecord.class).write().format("iceberg").mode("append").saveAsTable(str2);
        Assert.assertEquals("No additional rows should be added to the original table", collectAsList.size(), spark.sql(String.format("SELECT * FROM %s", str)).collectAsList().size());
        Assert.assertEquals("Added row not found in snapshot", 1L, spark.sql(String.format("SELECT * FROM %s WHERE id = 4 AND data = 'd'", str2)).collectAsList().size());
    }

    private String sourceName(String str) {
        return "default." + this.catalog.name() + "_" + this.type + "_" + str;
    }

    private String destName(String str) {
        return this.catalog.name().equals("spark_catalog") ? "default." + this.catalog.name() + "_" + this.type + "_" + str : this.catalog.name() + ".default." + this.catalog.name() + "_" + this.type + "_" + str;
    }
}
