package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.internal.SQLConf;
import org.jetbrains.annotations.NotNull;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/spark/source/TestSparkReaderDeletes.class */
public class TestSparkReaderDeletes extends DeleteReadTests {
    private final boolean vectorized;
    private static TestHiveMetastore metastore = null;
    protected static SparkSession spark = null;
    protected static HiveCatalog catalog = null;
    private static final Schema PROJECTION_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get()), MetadataColumns.IS_DELETED});

    public TestSparkReaderDeletes(boolean z) {
        this.vectorized = z;
    }

    @Parameterized.Parameters(name = "vectorized = {0}")
    public static Object[] parameters() {
        return new Object[]{false, true};
    }

    @BeforeClass
    public static void startMetastoreAndSpark() {
        metastore = new TestHiveMetastore();
        metastore.start();
        HiveConf hiveConf = metastore.hiveConf();
        spark = SparkSession.builder().master("local[2]").config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic").config("spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)).enableHiveSupport().getOrCreate();
        catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
        try {
            catalog.createNamespace(Namespace.of(new String[]{"default"}));
        } catch (AlreadyExistsException e) {
        }
    }

    @AfterClass
    public static void stopMetastoreAndSpark() throws Exception {
        catalog = null;
        metastore.stop();
        metastore = null;
        spark.stop();
        spark = null;
    }

    protected Table createTable(String str, Schema schema, PartitionSpec partitionSpec) {
        BaseTable createTable = catalog.createTable(TableIdentifier.of(new String[]{"default", str}), schema);
        TableOperations operations = createTable.operations();
        TableMetadata current = operations.current();
        operations.commit(current, current.upgradeToFormatVersion(2));
        if (this.vectorized) {
            createTable.updateProperties().set("read.parquet.vectorization.enabled", "true").set("read.parquet.vectorization.batch-size", "4").commit();
        } else {
            createTable.updateProperties().set("read.parquet.vectorization.enabled", "false").commit();
        }
        return createTable;
    }

    protected void dropTable(String str) {
        catalog.dropTable(TableIdentifier.of(new String[]{"default", str}));
    }

    public StructLikeSet rowSet(String str, Table table, String... strArr) {
        return rowSet(str, table.schema().select(strArr).asStruct(), strArr);
    }

    public StructLikeSet rowSet(String str, Types.StructType structType, String... strArr) {
        Dataset selectExpr = spark.read().format("iceberg").load(TableIdentifier.of(new String[]{"default", str}).toString()).selectExpr(strArr);
        StructLikeSet create = StructLikeSet.create(structType);
        selectExpr.collectAsList().forEach(row -> {
            create.add(new SparkStructLike(structType).wrap(row));
        });
        return create;
    }

    @Test
    public void testEqualityDeleteWithFilter() throws IOException {
        String substring = this.table.name().substring(this.table.name().lastIndexOf(".") + 1);
        Schema select = this.table.schema().select(new String[]{"data"});
        GenericRecord create = GenericRecord.create(select);
        this.table.newRowDelta().addDeletes(FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{create.copy("data", "a"), create.copy("data", "d"), create.copy("data", "g")}), select)).commit();
        Types.StructType asStruct = this.table.schema().select(new String[]{"*"}).asStruct();
        Dataset selectExpr = spark.read().format("iceberg").load(TableIdentifier.of(new String[]{"default", substring}).toString()).filter("data = 'a'").selectExpr(new String[]{"*"});
        StructLikeSet create2 = StructLikeSet.create(asStruct);
        selectExpr.collectAsList().forEach(row -> {
            create2.add(new SparkStructLike(asStruct).wrap(row));
        });
        Assert.assertEquals("Table should contain no rows", 0L, create2.size());
    }

    @Test
    public void testReadEqualityDeleteRows() throws IOException {
        Schema select = this.table.schema().select(new String[]{"data"});
        GenericRecord create = GenericRecord.create(select);
        ArrayList newArrayList = Lists.newArrayList(new Record[]{create.copy("data", "a"), create.copy("data", "d")});
        Schema select2 = this.table.schema().select(new String[]{"id"});
        GenericRecord create2 = GenericRecord.create(select2);
        this.table.newRowDelta().addDeletes(FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), newArrayList, select)).addDeletes(FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{create2.copy("id", 121), create2.copy("id", 122)}), select2)).commit();
        StructLikeSet rowSetWithIds = rowSetWithIds(new int[]{29, 89, 121, 122});
        StructLikeSet create3 = StructLikeSet.create(this.table.schema().asStruct());
        CloseableIterator it = TableScanUtil.planTasks(this.table.newScan().planFiles(), 33554432L, 10, 4194304L).iterator();
        while (it.hasNext()) {
            EqualityDeleteRowReader equalityDeleteRowReader = new EqualityDeleteRowReader((CombinedScanTask) it.next(), this.table, this.table.schema(), false);
            Throwable th = null;
            while (equalityDeleteRowReader.next()) {
                try {
                    try {
                        create3.add(new InternalRowWrapper(SparkSchemaUtil.convert(this.table.schema())).wrap(((InternalRow) equalityDeleteRowReader.get()).copy()));
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (equalityDeleteRowReader != null) {
                        if (th != null) {
                            try {
                                equalityDeleteRowReader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            equalityDeleteRowReader.close();
                        }
                    }
                    throw th2;
                }
            }
            if (equalityDeleteRowReader != null) {
                if (0 != 0) {
                    try {
                        equalityDeleteRowReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    equalityDeleteRowReader.close();
                }
            }
        }
        Assert.assertEquals("should include 4 deleted row", 4L, create3.size());
        Assert.assertEquals("deleted row should be matched", rowSetWithIds, create3);
    }

    @Test
    public void testPosDeletesAllRowsInBatch() throws IOException {
        Pair writeDeleteFile = FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Pair[]{Pair.of(this.dataFile.path(), 0L), Pair.of(this.dataFile.path(), 1L), Pair.of(this.dataFile.path(), 2L), Pair.of(this.dataFile.path(), 3L)}));
        this.table.newRowDelta().addDeletes((DeleteFile) writeDeleteFile.first()).validateDataFilesExist((Iterable) writeDeleteFile.second()).commit();
        Assert.assertEquals("Table should contain expected rows", rowSetWithoutIds(this.table, this.records, new int[]{29, 43, 61, 89}), rowSet(this.tableName, this.table, "*"));
    }

    @Test
    public void testPosDeletesWithDeletedColumn() throws IOException {
        Assume.assumeFalse(this.vectorized);
        Pair writeDeleteFile = FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Pair[]{Pair.of(this.dataFile.path(), 0L), Pair.of(this.dataFile.path(), 1L), Pair.of(this.dataFile.path(), 2L), Pair.of(this.dataFile.path(), 3L)}));
        this.table.newRowDelta().addDeletes((DeleteFile) writeDeleteFile.first()).validateDataFilesExist((Iterable) writeDeleteFile.second()).commit();
        Assert.assertEquals("Table should contain expected row", expectedRowSet(29, 43, 61, 89), rowSet(this.tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"));
    }

    @Test
    public void testEqualityDeleteWithDeletedColumn() throws IOException {
        Assume.assumeFalse(this.vectorized);
        String substring = this.table.name().substring(this.table.name().lastIndexOf(".") + 1);
        Schema select = this.table.schema().select(new String[]{"data"});
        GenericRecord create = GenericRecord.create(select);
        this.table.newRowDelta().addDeletes(FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{create.copy("data", "a"), create.copy("data", "d"), create.copy("data", "g")}), select)).commit();
        Assert.assertEquals("Table should contain expected row", expectedRowSet(29, 89, 122), rowSet(substring, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"));
    }

    @Test
    public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
        Assume.assumeFalse(this.vectorized);
        Schema select = this.table.schema().select(new String[]{"data"});
        GenericRecord create = GenericRecord.create(select);
        DeleteFile writeDeleteFile = FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{create.copy("data", "a"), create.copy("data", "d"), create.copy("data", "g")}), select);
        Pair writeDeleteFile2 = FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Pair[]{Pair.of(this.dataFile.path(), 3L), Pair.of(this.dataFile.path(), 5L)}));
        this.table.newRowDelta().addDeletes(writeDeleteFile).addDeletes((DeleteFile) writeDeleteFile2.first()).validateDataFilesExist((Iterable) writeDeleteFile2.second()).commit();
        Assert.assertEquals("Table should contain expected row", expectedRowSet(29, 89, 121, 122), rowSet(this.tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"));
    }

    @Test
    public void testFilterOnDeletedMetadataColumn() throws IOException {
        Assume.assumeFalse(this.vectorized);
        Pair writeDeleteFile = FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Pair[]{Pair.of(this.dataFile.path(), 0L), Pair.of(this.dataFile.path(), 1L), Pair.of(this.dataFile.path(), 2L), Pair.of(this.dataFile.path(), 3L)}));
        this.table.newRowDelta().addDeletes((DeleteFile) writeDeleteFile.first()).validateDataFilesExist((Iterable) writeDeleteFile.second()).commit();
        StructLikeSet expectedRowSetWithNonDeletesOnly = expectedRowSetWithNonDeletesOnly(29, 43, 61, 89);
        Dataset filter = spark.read().format("iceberg").load(TableIdentifier.of(new String[]{"default", this.tableName}).toString()).select("id", new String[]{"data", "_deleted"}).filter("_deleted = false");
        Types.StructType asStruct = PROJECTION_SCHEMA.asStruct();
        StructLikeSet create = StructLikeSet.create(asStruct);
        filter.collectAsList().forEach(row -> {
            create.add(new SparkStructLike(asStruct).wrap(row));
        });
        Assert.assertEquals("Table should contain expected row", expectedRowSetWithNonDeletesOnly, create);
        StructLikeSet expectedRowSetWithDeletesOnly = expectedRowSetWithDeletesOnly(29, 43, 61, 89);
        Dataset filter2 = spark.read().format("iceberg").load(TableIdentifier.of(new String[]{"default", this.tableName}).toString()).select("id", new String[]{"data", "_deleted"}).filter("_deleted = true");
        StructLikeSet create2 = StructLikeSet.create(asStruct);
        filter2.collectAsList().forEach(row2 -> {
            create2.add(new SparkStructLike(asStruct).wrap(row2));
        });
        Assert.assertEquals("Table should contain expected row", expectedRowSetWithDeletesOnly, create2);
    }

    @Test
    public void testIsDeletedColumnWithoutDeleteFile() {
        Assume.assumeFalse(this.vectorized);
        Assert.assertEquals("Table should contain expected row", expectedRowSet(new int[0]), rowSet(this.tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"));
    }

    private static StructLikeSet expectedRowSet(int... iArr) {
        return expectedRowSet(false, false, iArr);
    }

    private static StructLikeSet expectedRowSetWithDeletesOnly(int... iArr) {
        return expectedRowSet(false, true, iArr);
    }

    private static StructLikeSet expectedRowSetWithNonDeletesOnly(int... iArr) {
        return expectedRowSet(true, false, iArr);
    }

    private static StructLikeSet expectedRowSet(boolean z, boolean z2, int... iArr) {
        HashSet newHashSet = Sets.newHashSet(ArrayUtil.toIntList(iArr));
        List recordsWithDeletedColumn = recordsWithDeletedColumn();
        recordsWithDeletedColumn.forEach(record -> {
            if (newHashSet.contains(record.getField("id"))) {
                record.setField(MetadataColumns.IS_DELETED.name(), true);
            }
        });
        recordsWithDeletedColumn.removeIf(record2 -> {
            return newHashSet.contains(record2.getField("id")) && z;
        });
        recordsWithDeletedColumn.removeIf(record3 -> {
            return !newHashSet.contains(record3.getField("id")) && z2;
        });
        StructLikeSet create = StructLikeSet.create(PROJECTION_SCHEMA.asStruct());
        recordsWithDeletedColumn.forEach(record4 -> {
            create.add(new InternalRecordWrapper(PROJECTION_SCHEMA.asStruct()).wrap(record4));
        });
        return create;
    }

    @NotNull
    private static List recordsWithDeletedColumn() {
        ArrayList newArrayList = Lists.newArrayList();
        GenericRecord create = GenericRecord.create(PROJECTION_SCHEMA);
        newArrayList.add(create.copy("id", 29, "data", "a", "_deleted", false));
        newArrayList.add(create.copy("id", 43, "data", "b", "_deleted", false));
        newArrayList.add(create.copy("id", 61, "data", "c", "_deleted", false));
        newArrayList.add(create.copy("id", 89, "data", "d", "_deleted", false));
        newArrayList.add(create.copy("id", 100, "data", "e", "_deleted", false));
        newArrayList.add(create.copy("id", 121, "data", "f", "_deleted", false));
        newArrayList.add(create.copy("id", 122, "data", "g", "_deleted", false));
        return newArrayList;
    }
}
