package org.apache.iceberg.spark.source;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.class */
public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    public abstract Table createTable(TableIdentifier tableIdentifier, Schema schema, PartitionSpec partitionSpec);

    public abstract Table loadTable(TableIdentifier tableIdentifier, String str);

    public abstract String loadLocation(TableIdentifier tableIdentifier, String str);

    public abstract String loadLocation(TableIdentifier tableIdentifier);

    @Test
    public synchronized void testTablesSupport() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "table"});
        createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1"), new SimpleRecord(2, "2"), new SimpleRecord(3, "3")});
        spark.createDataFrame(newArrayList, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(loadLocation(of));
        Assert.assertEquals("Records should match", newArrayList, spark.read().format("iceberg").load(loadLocation(of)).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList());
    }

    @Test
    public void testEntriesTable() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "entries_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        Table loadTable = loadTable(of, "entries");
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "entries")).collectAsList();
        Snapshot currentSnapshot = createTable.currentSnapshot();
        Assert.assertEquals("Should only contain one manifest", 1L, currentSnapshot.allManifests().size());
        InputFile newInputFile = createTable.io().newInputFile(((ManifestFile) currentSnapshot.allManifests().get(0)).path());
        ArrayList newArrayList = Lists.newArrayList();
        AvroIterable build = Avro.read(newInputFile).project(loadTable.schema()).build();
        Throwable th = null;
        try {
            try {
                build.forEach(record -> {
                    record.put(2, 0L);
                    ((GenericData.Record) record.get("data_file")).put(0, Integer.valueOf(FileContent.DATA.id()));
                    newArrayList.add(record);
                });
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                Assert.assertEquals("Entries table should have one row", 1L, newArrayList.size());
                Assert.assertEquals("Actual results should have one row", 1L, collectAsList.size());
                TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAllEntriesTable() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "entries_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        Table loadTable = loadTable(of, "all_entries");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "all_entries")).orderBy("snapshot_id", new String[0]).collectAsList();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = Iterables.concat(Iterables.transform(createTable.snapshots(), (v0) -> {
            return v0.allManifests();
        })).iterator();
        while (it.hasNext()) {
            AvroIterable build = Avro.read(createTable.io().newInputFile(((ManifestFile) it.next()).path())).project(loadTable.schema()).build();
            Throwable th = null;
            try {
                try {
                    build.forEach(record -> {
                        record.put(2, 0L);
                        ((GenericData.Record) record.get("data_file")).put(0, Integer.valueOf(FileContent.DATA.id()));
                        newArrayList.add(record);
                    });
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (build != null) {
                        if (th != null) {
                            try {
                                build.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            build.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        newArrayList.sort(Comparator.comparing(record2 -> {
            return (Long) record2.get("snapshot_id");
        }));
        Assert.assertEquals("Entries table should have 3 rows", 3L, newArrayList.size());
        Assert.assertEquals("Actual results should have 3 rows", 3L, collectAsList.size());
        for (int i = 0; i < newArrayList.size(); i++) {
            TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(i), (Row) collectAsList.get(i));
        }
    }

    @Test
    public void testCountEntriesTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "count_entries_test"});
        createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        Assert.assertEquals("Count should return 1", 1L, spark.read().format("iceberg").load(loadLocation(of, "entries")).count());
        Assert.assertEquals("Count should return 1", 1L, spark.read().format("iceberg").load(loadLocation(of, "all_entries")).count());
    }

    @Test
    public void testFilesTable() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "files_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        Table loadTable = loadTable(of, "entries");
        Table loadTable2 = loadTable(of, "files");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "files")).collectAsList();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = createTable.currentSnapshot().dataManifests().iterator();
        while (it.hasNext()) {
            AvroIterable build = Avro.read(createTable.io().newInputFile(((ManifestFile) it.next()).path())).project(loadTable.schema()).build();
            Throwable th = null;
            try {
                try {
                    CloseableIterator it2 = build.iterator();
                    while (it2.hasNext()) {
                        GenericData.Record record = (GenericData.Record) it2.next();
                        if (((Integer) record.get("status")).intValue() < 2) {
                            GenericData.Record record2 = (GenericData.Record) record.get("data_file");
                            record2.put(0, Integer.valueOf(FileContent.DATA.id()));
                            newArrayList.add(record2);
                        }
                    }
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (build != null) {
                    if (th != null) {
                        try {
                            build.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th3;
            }
        }
        Assert.assertEquals("Files table should have one row", 1L, newArrayList.size());
        Assert.assertEquals("Actual results should have one row", 1L, collectAsList.size());
        TestHelpers.assertEqualsSafe(loadTable2.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
    }

    @Test
    public void testFilesTableWithSnapshotIdInheritance() throws Exception {
        spark.sql("DROP TABLE IF EXISTS parquet_table");
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "files_inheritance_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        createTable.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        Table loadTable = loadTable(of, "entries");
        Table loadTable2 = loadTable(of, "files");
        spark.sql(String.format("CREATE TABLE parquet_table (data string, id int) USING parquet PARTITIONED BY (id) LOCATION '%s'", this.temp.newFolder()));
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("data", new String[]{"id"}).write().mode("overwrite").insertInto("parquet_table");
        try {
            SparkTableUtil.importSparkTable(spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), createTable, createTable.location() + "/metadata");
            List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "files")).collectAsList();
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = createTable.currentSnapshot().dataManifests().iterator();
            while (it.hasNext()) {
                AvroIterable build = Avro.read(createTable.io().newInputFile(((ManifestFile) it.next()).path())).project(loadTable.schema()).build();
                Throwable th = null;
                try {
                    try {
                        CloseableIterator it2 = build.iterator();
                        while (it2.hasNext()) {
                            GenericData.Record record = (GenericData.Record) ((GenericData.Record) it2.next()).get("data_file");
                            record.put(0, Integer.valueOf(FileContent.DATA.id()));
                            newArrayList.add(record);
                        }
                        if (build != null) {
                            if (0 != 0) {
                                try {
                                    build.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                build.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
            Assert.assertEquals("Files table should have one row", 2L, newArrayList.size());
            Assert.assertEquals("Actual results should have one row", 2L, collectAsList.size());
            TestHelpers.assertEqualsSafe(loadTable2.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
            TestHelpers.assertEqualsSafe(loadTable2.schema().asStruct(), (GenericData.Record) newArrayList.get(1), (Row) collectAsList.get(1));
            spark.sql("DROP TABLE parquet_table");
        } catch (Throwable th3) {
            spark.sql("DROP TABLE parquet_table");
            throw th3;
        }
    }

    @Test
    public void testEntriesTableWithSnapshotIdInheritance() throws Exception {
        spark.sql("DROP TABLE IF EXISTS parquet_table");
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "entries_inheritance_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        createTable.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        spark.sql(String.format("CREATE TABLE parquet_table (data string, id int) USING parquet PARTITIONED BY (id) LOCATION '%s'", this.temp.newFolder()));
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("data", new String[]{"id"}).write().mode("overwrite").insertInto("parquet_table");
        try {
            SparkTableUtil.importSparkTable(spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), createTable, createTable.location() + "/metadata");
            List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "entries")).select("sequence_number", new String[]{"snapshot_id", "data_file"}).collectAsList();
            createTable.refresh();
            long snapshotId = createTable.currentSnapshot().snapshotId();
            Assert.assertEquals("Entries table should have 2 rows", 2L, collectAsList.size());
            Assert.assertEquals("Sequence number must match", 0L, ((Row) collectAsList.get(0)).getLong(0));
            Assert.assertEquals("Snapshot id must match", snapshotId, ((Row) collectAsList.get(0)).getLong(1));
            Assert.assertEquals("Sequence number must match", 0L, ((Row) collectAsList.get(1)).getLong(0));
            Assert.assertEquals("Snapshot id must match", snapshotId, ((Row) collectAsList.get(1)).getLong(1));
            spark.sql("DROP TABLE parquet_table");
        } catch (Throwable th) {
            spark.sql("DROP TABLE parquet_table");
            throw th;
        }
    }

    @Test
    public void testFilesUnpartitionedTable() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "unpartitioned_files_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        Table loadTable = loadTable(of, "entries");
        Table loadTable2 = loadTable(of, "files");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        DataFile dataFile = (DataFile) Iterables.getOnlyElement(createTable.currentSnapshot().addedFiles());
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.newDelete().deleteFile(dataFile).commit();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "files")).collectAsList();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = createTable.currentSnapshot().dataManifests().iterator();
        while (it.hasNext()) {
            AvroIterable build = Avro.read(createTable.io().newInputFile(((ManifestFile) it.next()).path())).project(loadTable.schema()).build();
            Throwable th = null;
            try {
                try {
                    CloseableIterator it2 = build.iterator();
                    while (it2.hasNext()) {
                        GenericData.Record record = (GenericData.Record) it2.next();
                        if (((Integer) record.get("status")).intValue() < 2) {
                            GenericData.Record record2 = (GenericData.Record) record.get("data_file");
                            record2.put(0, Integer.valueOf(FileContent.DATA.id()));
                            newArrayList.add(record2);
                        }
                    }
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (build != null) {
                        if (th != null) {
                            try {
                                build.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            build.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        Assert.assertEquals("Files table should have one row", 1L, newArrayList.size());
        Assert.assertEquals("Actual results should have one row", 1L, collectAsList.size());
        TestHelpers.assertEqualsSafe(loadTable2.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
    }

    @Test
    public void testAllMetadataTablesWithStagedCommits() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "stage_aggregate_table_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        createTable.updateProperties().set("write.wap.enabled", "true").commit();
        spark.conf().set("spark.wap.id", "1234567");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "all_data_files")).collectAsList();
        List collectAsList2 = spark.read().format("iceberg").load(loadLocation(of, "all_manifests")).collectAsList();
        List collectAsList3 = spark.read().format("iceberg").load(loadLocation(of, "all_entries")).collectAsList();
        Assert.assertTrue("Stage table should have some snapshots", createTable.snapshots().iterator().hasNext());
        Assert.assertEquals("Stage table should have null currentSnapshot", (Object) null, createTable.currentSnapshot());
        Assert.assertEquals("Actual results should have two rows", 2L, collectAsList.size());
        Assert.assertEquals("Actual results should have two rows", 2L, collectAsList2.size());
        Assert.assertEquals("Actual results should have two rows", 2L, collectAsList3.size());
    }

    @Test
    public void testAllDataFilesTable() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "files_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        Table loadTable = loadTable(of, "entries");
        Table loadTable2 = loadTable(of, "all_data_files");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "all_data_files")).orderBy("file_path", new String[0]).collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return row.getString(1);
        }));
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = Iterables.concat(Iterables.transform(createTable.snapshots(), (v0) -> {
            return v0.dataManifests();
        })).iterator();
        while (it.hasNext()) {
            AvroIterable build = Avro.read(createTable.io().newInputFile(((ManifestFile) it.next()).path())).project(loadTable.schema()).build();
            Throwable th = null;
            try {
                try {
                    CloseableIterator it2 = build.iterator();
                    while (it2.hasNext()) {
                        GenericData.Record record = (GenericData.Record) it2.next();
                        if (((Integer) record.get("status")).intValue() < 2) {
                            GenericData.Record record2 = (GenericData.Record) record.get("data_file");
                            record2.put(0, Integer.valueOf(FileContent.DATA.id()));
                            newArrayList.add(record2);
                        }
                    }
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (build != null) {
                    if (th != null) {
                        try {
                            build.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th3;
            }
        }
        newArrayList.sort(Comparator.comparing(record3 -> {
            return record3.get("file_path").toString();
        }));
        Assert.assertEquals("Files table should have two rows", 2L, newArrayList.size());
        Assert.assertEquals("Actual results should have two rows", 2L, collectAsList.size());
        for (int i = 0; i < newArrayList.size(); i++) {
            TestHelpers.assertEqualsSafe(loadTable2.schema().asStruct(), (GenericData.Record) newArrayList.get(i), (Row) collectAsList.get(i));
        }
    }

    @Test
    public void testHistoryTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "history_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        Table loadTable = loadTable(of, "history");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        long timestampMillis = createTable.currentSnapshot().timestampMillis();
        long snapshotId = createTable.currentSnapshot().snapshotId();
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        long timestampMillis2 = createTable.currentSnapshot().timestampMillis();
        long snapshotId2 = createTable.currentSnapshot().snapshotId();
        createTable.rollback().toSnapshotId(snapshotId).commit();
        long timestampMillis3 = ((HistoryEntry) Iterables.getLast(createTable.history())).timestampMillis();
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        long timestampMillis4 = createTable.currentSnapshot().timestampMillis();
        long snapshotId3 = createTable.currentSnapshot().snapshotId();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "history")).collectAsList();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema(), "history"));
        ArrayList newArrayList = Lists.newArrayList(new GenericData.Record[]{genericRecordBuilder.set("made_current_at", Long.valueOf(timestampMillis * 1000)).set("snapshot_id", Long.valueOf(snapshotId)).set("parent_id", (Object) null).set("is_current_ancestor", true).build(), genericRecordBuilder.set("made_current_at", Long.valueOf(timestampMillis2 * 1000)).set("snapshot_id", Long.valueOf(snapshotId2)).set("parent_id", Long.valueOf(snapshotId)).set("is_current_ancestor", false).build(), genericRecordBuilder.set("made_current_at", Long.valueOf(timestampMillis3 * 1000)).set("snapshot_id", Long.valueOf(snapshotId)).set("parent_id", (Object) null).set("is_current_ancestor", true).build(), genericRecordBuilder.set("made_current_at", Long.valueOf(timestampMillis4 * 1000)).set("snapshot_id", Long.valueOf(snapshotId3)).set("parent_id", Long.valueOf(snapshotId)).set("is_current_ancestor", true).build()});
        Assert.assertEquals("History table should have a row for each commit", 4L, collectAsList.size());
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(1), (Row) collectAsList.get(1));
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(2), (Row) collectAsList.get(2));
    }

    @Test
    public void testSnapshotsTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "snapshots_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        Table loadTable = loadTable(of, "snapshots");
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        long timestampMillis = createTable.currentSnapshot().timestampMillis();
        long snapshotId = createTable.currentSnapshot().snapshotId();
        String manifestListLocation = createTable.currentSnapshot().manifestListLocation();
        createTable.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
        long timestampMillis2 = createTable.currentSnapshot().timestampMillis();
        long snapshotId2 = createTable.currentSnapshot().snapshotId();
        String manifestListLocation2 = createTable.currentSnapshot().manifestListLocation();
        createTable.rollback().toSnapshotId(snapshotId).commit();
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "snapshots")).collectAsList();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema(), "snapshots"));
        ArrayList newArrayList = Lists.newArrayList(new GenericData.Record[]{genericRecordBuilder.set("committed_at", Long.valueOf(timestampMillis * 1000)).set("snapshot_id", Long.valueOf(snapshotId)).set("parent_id", (Object) null).set("operation", "append").set("manifest_list", manifestListLocation).set("summary", ImmutableMap.of("added-records", "1", "added-data-files", "1", "changed-partition-count", "1", "total-data-files", "1", "total-records", "1")).build(), genericRecordBuilder.set("committed_at", Long.valueOf(timestampMillis2 * 1000)).set("snapshot_id", Long.valueOf(snapshotId2)).set("parent_id", Long.valueOf(snapshotId)).set("operation", "delete").set("manifest_list", manifestListLocation2).set("summary", ImmutableMap.of("deleted-records", "1", "deleted-data-files", "1", "changed-partition-count", "1", "total-records", "0", "total-data-files", "0")).build()});
        Assert.assertEquals("Snapshots table should have a row for each snapshot", 2L, collectAsList.size());
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList.get(0));
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(1), (Row) collectAsList.get(1));
    }

    @Test
    public void testManifestsTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "manifests_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        Table loadTable = loadTable(of, "manifests");
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "manifests")).collectAsList();
        createTable.refresh();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema(), "manifests"));
        GenericRecordBuilder genericRecordBuilder2 = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
        List transform = Lists.transform(createTable.currentSnapshot().allManifests(), manifestFile -> {
            return genericRecordBuilder.set("path", manifestFile.path()).set("length", Long.valueOf(manifestFile.length())).set("partition_spec_id", Integer.valueOf(manifestFile.partitionSpecId())).set("added_snapshot_id", manifestFile.snapshotId()).set("added_data_files_count", manifestFile.addedFilesCount()).set("existing_data_files_count", manifestFile.existingFilesCount()).set("deleted_data_files_count", manifestFile.deletedFilesCount()).set("partition_summaries", Lists.transform(manifestFile.partitions(), partitionFieldSummary -> {
                return genericRecordBuilder2.set("contains_null", false).set("lower_bound", "1").set("upper_bound", "1").build();
            })).build();
        });
        Assert.assertEquals("Manifests table should have one manifest row", 1L, collectAsList.size());
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) transform.get(0), (Row) collectAsList.get(0));
    }

    @Test
    public void testAllManifestsTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "manifests_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        Table loadTable = loadTable(of, "all_manifests");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        ArrayList newArrayList = Lists.newArrayList();
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        newArrayList.addAll(createTable.currentSnapshot().allManifests());
        createTable.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
        newArrayList.addAll(createTable.currentSnapshot().allManifests());
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "all_manifests")).orderBy("path", new String[0]).collectAsList();
        createTable.refresh();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema(), "manifests"));
        GenericRecordBuilder genericRecordBuilder2 = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
        ArrayList newArrayList2 = Lists.newArrayList(Iterables.transform(newArrayList, manifestFile -> {
            return genericRecordBuilder.set("path", manifestFile.path()).set("length", Long.valueOf(manifestFile.length())).set("partition_spec_id", Integer.valueOf(manifestFile.partitionSpecId())).set("added_snapshot_id", manifestFile.snapshotId()).set("added_data_files_count", manifestFile.addedFilesCount()).set("existing_data_files_count", manifestFile.existingFilesCount()).set("deleted_data_files_count", manifestFile.deletedFilesCount()).set("partition_summaries", Lists.transform(manifestFile.partitions(), partitionFieldSummary -> {
                return genericRecordBuilder2.set("contains_null", false).set("lower_bound", "1").set("upper_bound", "1").build();
            })).build();
        }));
        newArrayList2.sort(Comparator.comparing(record -> {
            return record.get("path").toString();
        }));
        Assert.assertEquals("Manifests table should have two manifest rows", 2L, collectAsList.size());
        for (int i = 0; i < newArrayList2.size(); i++) {
            TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList2.get(i), (Row) collectAsList.get(i));
        }
    }

    @Test
    public void testPartitionsTable() {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "partitions_test"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
        Table loadTable = loadTable(of, "partitions");
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset createDataFrame2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createTable.refresh();
        long snapshotId = createTable.currentSnapshot().snapshotId();
        createDataFrame2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        List collectAsList = spark.read().format("iceberg").load(loadLocation(of, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema(), "partitions"));
        GenericRecordBuilder genericRecordBuilder2 = new GenericRecordBuilder(AvroSchemaUtil.convert(loadTable.schema().findType("partition").asStructType(), "partition"));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(genericRecordBuilder.set("partition", genericRecordBuilder2.set("id", 1).build()).set("record_count", 1L).set("file_count", 1).build());
        newArrayList.add(genericRecordBuilder.set("partition", genericRecordBuilder2.set("id", 2).build()).set("record_count", 1L).set("file_count", 1).build());
        Assert.assertEquals("Partitions table should have two rows", 2L, newArrayList.size());
        Assert.assertEquals("Actual results should have two rows", 2L, collectAsList.size());
        for (int i = 0; i < 2; i++) {
            TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(i), (Row) collectAsList.get(i));
        }
        List collectAsList2 = spark.read().format("iceberg").option("snapshot-id", String.valueOf(snapshotId)).load(loadLocation(of, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        Assert.assertEquals("Actual results should have one row", 1L, collectAsList2.size());
        TestHelpers.assertEqualsSafe(loadTable.schema().asStruct(), (GenericData.Record) newArrayList.get(0), (Row) collectAsList2.get(0));
    }

    @Test
    public void testRemoveOrphanFilesActionSupport() throws InterruptedException {
        TableIdentifier of = TableIdentifier.of(new String[]{"db", "table"});
        Table createTable = createTable(of, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(loadLocation(of));
        createDataFrame.write().mode("append").parquet(createTable.location() + "/data");
        Thread.sleep(1000L);
        Assert.assertTrue("Should not delete any metadata files", Actions.forTable(createTable).removeOrphanFiles().location(createTable.location() + "/metadata").olderThan(System.currentTimeMillis()).execute().isEmpty());
        Assert.assertEquals("Should delete 1 data file", 1L, r0.removeOrphanFiles().olderThan(System.currentTimeMillis()).execute().size());
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(loadLocation(of)).as(Encoders.bean(SimpleRecord.class)).collectAsList());
    }
}
