package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.UnmodifiableIterator;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/iceberg/spark/source/TestPositionDeletesTable.class */
public class TestPositionDeletesTable extends CatalogTestBase {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get())});
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false");
    private static final List<String> NON_PATH_COLS = ImmutableList.of("file_path", "pos", "row", "partition", "spec_id");

    @Parameter(index = 3)
    private FileFormat format;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.PARQUET}, new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.AVRO}, new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.ORC}};
    }

    @TestTemplate
    public void testNullRows() throws IOException {
        Table createTable = createTable("null_rows", SCHEMA, PartitionSpec.unpartitioned());
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Pair.of(dataFile.path(), 0L));
        newArrayList.add(Pair.of(dataFile.path(), 1L));
        Pair writeDeleteFile = FileHelpers.writeDeleteFile(createTable, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(new Object[]{0}), newArrayList);
        createTable.newRowDelta().addDeletes((DeleteFile) writeDeleteFile.first()).commit();
        Assertions.assertThat(actual("null_rows", createTable)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, Lists.newArrayList(new PositionDelete[]{positionDelete(dataFile.path(), 0L), positionDelete(dataFile.path(), 1L)}), null, ((DeleteFile) writeDeleteFile.first()).path().toString()));
        dropTable("null_rows");
    }

    @TestTemplate
    public void testPartitionedTable() throws IOException {
        Table createTable = createTable("partitioned_table", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile2.second()).commit();
        StructLikeSet actual = actual("partitioned_table", createTable, "row.data='b'");
        GenericRecord create = GenericRecord.create(createTable.spec().partitionType());
        create.setField("data", "b");
        Assertions.assertThat(actual).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create, ((DeleteFile) deleteFile2.second()).path().toString()));
        dropTable("partitioned_table");
    }

    @TestTemplate
    public void testSelect() throws IOException {
        Table createTable = createTable("select", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile2.second()).commit();
        List<Object[]> rowsToJava = rowsToJava(spark.read().format("iceberg").load("default." + "select" + ".position_deletes").withColumn("input_file", functions.input_file_name()).select("row.id", new String[]{"pos", "delete_file_path", "input_file"}).collectAsList());
        ArrayList newArrayList = Lists.newArrayList();
        BiFunction biFunction = (positionDelete, deleteFile3) -> {
            return row(Integer.valueOf(((Integer) ((GenericRecord) positionDelete.get(2, GenericRecord.class)).get(0, Integer.class)).intValue()), Long.valueOf(((Long) positionDelete.get(1, Long.class)).longValue()), deleteFile3.path().toString(), deleteFile3.path().toString());
        };
        newArrayList.addAll((Collection) ((List) deleteFile.first()).stream().map(positionDelete2 -> {
            return (Object[]) biFunction.apply(positionDelete2, (DeleteFile) deleteFile.second());
        }).collect(Collectors.toList()));
        newArrayList.addAll((Collection) ((List) deleteFile2.first()).stream().map(positionDelete3 -> {
            return (Object[]) biFunction.apply(positionDelete3, (DeleteFile) deleteFile2.second());
        }).collect(Collectors.toList()));
        Comparator<? super Object[]> comparator = (objArr, objArr2) -> {
            int compare = Integer.compare(((Integer) objArr[0]).intValue(), ((Integer) objArr2[0]).intValue());
            return compare != 0 ? compare : ((String) objArr[2]).compareTo((String) objArr2[2]);
        };
        rowsToJava.sort(comparator);
        newArrayList.sort(comparator);
        Assertions.assertThat(rowsToJava).as("Position Delete table should contain expected rows", new Object[0]).usingRecursiveComparison().isEqualTo(newArrayList);
        dropTable("select");
    }

    @TestTemplate
    public void testSplitTasks() throws IOException {
        Table createTable = createTable("big_table", SCHEMA, PartitionSpec.unpartitioned());
        createTable.updateProperties().set("read.split.target-size", "100").commit();
        GenericRecord create = GenericRecord.create(createTable.schema());
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 500; i++) {
            newArrayList.add(create.copy("id", Integer.valueOf(i), "data", String.valueOf(i)));
        }
        DataFile writeDataFile = FileHelpers.writeDataFile(createTable, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(new Object[0]), newArrayList);
        createTable.newAppend().appendFile(writeDataFile).commit();
        ArrayList newArrayList2 = Lists.newArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 500) {
                break;
            }
            newArrayList2.add(positionDelete(createTable.schema(), writeDataFile.path(), Long.valueOf(j2), Integer.valueOf((int) j2), String.valueOf(j2)));
            j = j2 + 1;
        }
        DeleteFile writePosDeleteFile = FileHelpers.writePosDeleteFile(createTable, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(new Object[]{0}), newArrayList2);
        createTable.newRowDelta().addDeletes(writePosDeleteFile).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        if (this.format.equals(FileFormat.AVRO)) {
            Assertions.assertThat(createMetadataTableInstance.newBatchScan().planTasks()).as("Position delete scan should produce more than one split", new Object[0]).hasSizeGreaterThan(1);
        } else {
            Assertions.assertThat(createMetadataTableInstance.newBatchScan().planTasks()).as("Position delete scan should produce one split", new Object[0]).hasSize(1);
        }
        Assertions.assertThat(actual("big_table", createTable)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, newArrayList2, null, writePosDeleteFile.path().toString()));
        dropTable("big_table");
    }

    @TestTemplate
    public void testPartitionFilter() throws IOException {
        Table createTable = createTable("partition_filter", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        DataFile dataFile = dataFile(createTable, "a");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile(createTable, "b")).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile2.second()).commit();
        GenericRecord create = GenericRecord.create(createTable.spec().partitionType());
        Record copy = create.copy("data", "a");
        Record copy2 = create.copy("data", "b");
        StructLikeSet expected = expected(createTable, (List) deleteFile.first(), copy, ((DeleteFile) deleteFile.second()).path().toString());
        StructLikeSet expected2 = expected(createTable, (List) deleteFile2.first(), copy2, ((DeleteFile) deleteFile2.second()).path().toString());
        StructLikeSet create2 = StructLikeSet.create(createMetadataTableInstance.schema().asStruct());
        create2.addAll(expected);
        create2.addAll(expected2);
        Assertions.assertThat(actual("partition_filter", createTable)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(create2);
        Assertions.assertThat(actual("partition_filter", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected);
        dropTable("partition_filter");
    }

    @TestTemplate
    public void testPartitionTransformFilter() throws IOException {
        Table createTable = createTable("partition_filter", SCHEMA, PartitionSpec.builderFor(SCHEMA).truncate("data", 1).build());
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        DataFile dataFile = dataFile(createTable, new Object[]{"aa"}, new Object[]{"a"});
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile(createTable, new Object[]{"bb"}, new Object[]{"b"})).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, new Object[]{"aa"}, new Object[]{"a"});
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile, new Object[]{"bb"}, new Object[]{"b"});
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile2.second()).commit();
        GenericRecord create = GenericRecord.create(createTable.spec().partitionType());
        Record copy = create.copy("data_trunc", "a");
        Record copy2 = create.copy("data_trunc", "b");
        StructLikeSet expected = expected(createTable, (List) deleteFile.first(), copy, ((DeleteFile) deleteFile.second()).path().toString());
        StructLikeSet expected2 = expected(createTable, (List) deleteFile2.first(), copy2, ((DeleteFile) deleteFile2.second()).path().toString());
        StructLikeSet create2 = StructLikeSet.create(createMetadataTableInstance.schema().asStruct());
        create2.addAll(expected);
        create2.addAll(expected2);
        Assertions.assertThat(actual("partition_filter", createTable)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(create2);
        Assertions.assertThat(actual("partition_filter", createTable, "partition.data_trunc = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected);
        dropTable("partition_filter");
    }

    @TestTemplate
    public void testPartitionEvolutionReplace() throws Exception {
        Table createTable = createTable("partition_evolution", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        int specId = createTable.spec().specId();
        DataFile dataFile = dataFile(createTable, "a");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile(createTable, "b")).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile, "b").second()).commit();
        createTable.updateSpec().removeField("data").addField("id").commit();
        DataFile dataFile2 = dataFile(createTable, 10);
        createTable.newAppend().appendFile(dataFile2).appendFile(dataFile(createTable, 99)).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, 10);
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, 99).second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Assertions.assertThat(actual("partition_evolution", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile.first(), create.copy("data", "a"), specId, ((DeleteFile) deleteFile.second()).path().toString()));
        Assertions.assertThat(actual("partition_evolution", createTable, "partition.id = 10 AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("id", 10), createTable.spec().specId(), ((DeleteFile) deleteFile2.second()).path().toString()));
        dropTable("partition_evolution");
    }

    @TestTemplate
    public void testPartitionEvolutionAdd() throws Exception {
        Table createTable = createTable("partition_evolution_add", SCHEMA, PartitionSpec.unpartitioned());
        int specId = createTable.spec().specId();
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, new Object[0]);
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).commit();
        createTable.updateSpec().addField("data").commit();
        int specId2 = createTable.spec().specId();
        DataFile dataFile2 = dataFile(createTable, "a");
        DataFile dataFile3 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile2).appendFile(dataFile3).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile3, "b").second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Assertions.assertThat(actual("partition_evolution_add", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", "a"), specId2, ((DeleteFile) deleteFile2.second()).path().toString()));
        Assertions.assertThat(actual("partition_evolution_add", createTable, "partition.data IS NULL and pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile.first(), create.copy("data", (Object) null), specId, ((DeleteFile) deleteFile.second()).path().toString()));
        dropTable("partition_evolution_add");
    }

    @TestTemplate
    public void testPartitionEvolutionRemove() throws Exception {
        Table createTable = createTable("partition_evolution_remove", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        int specId = createTable.spec().specId();
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, "b").second()).commit();
        createTable.updateSpec().removeField("data").commit();
        int specId2 = createTable.spec().specId();
        DataFile dataFile3 = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile3).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile3, new Object[0]);
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Assertions.assertThat(actual("partition_evolution_remove", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile.first(), create.copy("data", "a"), specId, ((DeleteFile) deleteFile.second()).path().toString()));
        Assertions.assertThat(actual("partition_evolution_remove", createTable, "partition.data IS NULL and pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", (Object) null), specId2, ((DeleteFile) deleteFile2.second()).path().toString()));
        dropTable("partition_evolution_remove");
    }

    @TestTemplate
    public void testSpecIdFilter() throws Exception {
        Table createTable = createTable("spec_id_filter", SCHEMA, PartitionSpec.unpartitioned());
        int specId = createTable.spec().specId();
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, new Object[0]);
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).commit();
        createTable.updateSpec().addField("data").commit();
        int specId2 = createTable.spec().specId();
        DataFile dataFile2 = dataFile(createTable, "a");
        DataFile dataFile3 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile2).appendFile(dataFile3).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile3 = deleteFile(createTable, dataFile3, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile3.second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Assertions.assertThat(actual("spec_id_filter", createTable, String.format("spec_id = %d", Integer.valueOf(specId)))).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile.first(), create, specId, ((DeleteFile) deleteFile.second()).path().toString()));
        Record copy = create.copy("data", "a");
        Record copy2 = create.copy("data", "b");
        StructLikeSet expected = expected(createTable, (List) deleteFile2.first(), copy, specId2, ((DeleteFile) deleteFile2.second()).path().toString());
        expected.addAll(expected(createTable, (List) deleteFile3.first(), copy2, specId2, ((DeleteFile) deleteFile3.second()).path().toString()));
        Assertions.assertThat(actual("spec_id_filter", createTable, String.format("spec_id = %d", Integer.valueOf(specId2)))).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected);
        dropTable("spec_id_filter");
    }

    @TestTemplate
    public void testSchemaEvolutionAdd() throws Exception {
        Table createTable = createTable("schema_evolution_add", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, "b").second()).commit();
        createTable.updateSchema().addColumn("new_col_1", Types.IntegerType.get()).addColumn("new_col_2", Types.IntegerType.get()).commit();
        DataFile dataFile3 = dataFile(createTable, "c");
        DataFile dataFile4 = dataFile(createTable, "d");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile3, "c");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile4, "d").second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Record copy = create.copy("data", "a");
        List<PositionDelete<?>> list = (List) deleteFile.first();
        list.forEach(positionDelete -> {
            GenericRecord genericRecord = (GenericRecord) positionDelete.get(2, GenericRecord.class);
            GenericRecord create2 = GenericRecord.create(createTable.schema().asStruct());
            create2.set(0, genericRecord.get(0));
            create2.set(1, genericRecord.get(1));
            create2.set(2, (Object) null);
            create2.set(3, (Object) null);
            positionDelete.set(2, create2);
        });
        Assertions.assertThat(actual("schema_evolution_add", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, list, copy, ((DeleteFile) deleteFile.second()).path().toString()));
        Assertions.assertThat(actual("schema_evolution_add", createTable, "partition.data = 'c' and pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", "c"), ((DeleteFile) deleteFile2.second()).path().toString()));
        dropTable("schema_evolution_add");
    }

    @TestTemplate
    public void testSchemaEvolutionRemove() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()), Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get())});
        Table createTable = createTable("schema_evolution_remove", schema, PartitionSpec.builderFor(schema).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, "b").second()).commit();
        createTable.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
        DataFile dataFile3 = dataFile(createTable, "c");
        DataFile dataFile4 = dataFile(createTable, "d");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile3, "c");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile4, "d").second()).commit();
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Record copy = create.copy("data", "a");
        List<PositionDelete<?>> list = (List) deleteFile.first();
        list.forEach(positionDelete -> {
            GenericRecord genericRecord = (GenericRecord) positionDelete.get(2, GenericRecord.class);
            GenericRecord create2 = GenericRecord.create(createTable.schema().asStruct());
            create2.set(0, genericRecord.get(0));
            create2.set(1, genericRecord.get(1));
            positionDelete.set(2, create2);
        });
        Assertions.assertThat(actual("schema_evolution_remove", createTable, "partition.data = 'a' AND pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, list, copy, ((DeleteFile) deleteFile.second()).path().toString()));
        Assertions.assertThat(actual("schema_evolution_remove", createTable, "partition.data = 'c' and pos >= 0")).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", "c"), ((DeleteFile) deleteFile2.second()).path().toString()));
        dropTable("schema_evolution_remove");
    }

    @TestTemplate
    public void testWrite() throws IOException, NoSuchTableException {
        Table createTable = createTable("test_write", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile2.second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "test_write" + ".position_deletes";
        UnmodifiableIterator it = ImmutableList.of("a", "b").iterator();
        while (it.hasNext()) {
            CloseableIterable<ScanTask> tasks = tasks(createMetadataTableInstance, "data", (String) it.next());
            try {
                String uuid = UUID.randomUUID().toString();
                stageTask(createTable, uuid, tasks);
                Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
                Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
                load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
                commit(createTable, createMetadataTableInstance, uuid, 1);
                if (tasks != null) {
                    tasks.close();
                }
            } catch (Throwable th) {
                if (tasks != null) {
                    try {
                        tasks.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        GenericRecord create = GenericRecord.create(createTable.spec().partitionType());
        Record copy = create.copy("data", "a");
        Record copy2 = create.copy("data", "b");
        StructLikeSet expected = expected(createTable, (List) deleteFile.first(), copy, null);
        StructLikeSet expected2 = expected(createTable, (List) deleteFile2.first(), copy2, null);
        StructLikeSet create2 = StructLikeSet.create(TypeUtil.selectNot(createMetadataTableInstance.schema(), ImmutableSet.of(2147483646)).asStruct());
        create2.addAll(expected);
        create2.addAll(expected2);
        Assertions.assertThat(actual("test_write", createTable, null, NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(create2);
        dropTable("test_write");
    }

    @TestTemplate
    public void testWriteUnpartitionedNullRows() throws Exception {
        Table createTable = createTable("write_null_rows", SCHEMA, PartitionSpec.unpartitioned());
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Pair.of(dataFile.path(), 0L));
        newArrayList.add(Pair.of(dataFile.path(), 1L));
        createTable.newRowDelta().addDeletes((DeleteFile) FileHelpers.writeDeleteFile(createTable, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(new Object[]{0}), newArrayList).first()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_null_rows" + ".position_deletes";
        CloseableIterable planFiles = createMetadataTableInstance.newBatchScan().planFiles();
        try {
            String uuid = UUID.randomUUID().toString();
            stageTask(createTable, uuid, planFiles);
            Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
            Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
            load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
            commit(createTable, createMetadataTableInstance, uuid, 1);
            if (planFiles != null) {
                planFiles.close();
            }
            Assertions.assertThat(actual("write_null_rows", createTable, null, ImmutableList.of("file_path", "pos", "row", "spec_id"))).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, Lists.newArrayList(new PositionDelete[]{positionDelete(dataFile.path(), 0L), positionDelete(dataFile.path(), 1L)}), null, null));
            dropTable("write_null_rows");
        } catch (Throwable th) {
            if (planFiles != null) {
                try {
                    planFiles.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    public void testWriteMixedRows() throws Exception {
        Table createTable = createTable("write_mixed_rows", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Pair.of(dataFile.path(), 0L));
        newArrayList.add(Pair.of(dataFile.path(), 1L));
        Pair writeDeleteFile = FileHelpers.writeDeleteFile(createTable, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(new Object[]{"a"}), newArrayList);
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile2, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) writeDeleteFile.first()).addDeletes((DeleteFile) deleteFile.second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_mixed_rows" + ".position_deletes";
        UnmodifiableIterator it = ImmutableList.of("a", "b").iterator();
        while (it.hasNext()) {
            CloseableIterable<ScanTask> tasks = tasks(createMetadataTableInstance, "data", (String) it.next());
            try {
                String uuid = UUID.randomUUID().toString();
                stageTask(createTable, uuid, tasks);
                Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).load(str);
                Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
                load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
                commit(createTable, createMetadataTableInstance, uuid, 1);
                if (tasks != null) {
                    tasks.close();
                }
            } catch (Throwable th) {
                if (tasks != null) {
                    try {
                        tasks.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        StructLikeSet actual = actual("write_mixed_rows", createTable, null, ImmutableList.of("file_path", "pos", "row", "partition", "spec_id"));
        GenericRecord create = GenericRecord.create(createTable.spec().partitionType());
        Record copy = create.copy("data", "a");
        Record copy2 = create.copy("data", "b");
        StructLikeSet create2 = StructLikeSet.create(TypeUtil.selectNot(createMetadataTableInstance.schema(), ImmutableSet.of(2147483646)).asStruct());
        create2.addAll(expected(createTable, Lists.newArrayList(new PositionDelete[]{positionDelete(dataFile.path(), 0L), positionDelete(dataFile.path(), 1L)}), copy, null));
        create2.addAll(expected(createTable, (List) deleteFile.first(), copy2, null));
        Assertions.assertThat(actual).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(create2);
        dropTable("write_mixed_rows");
    }

    @TestTemplate
    public void testWritePartitionEvolutionAdd() throws Exception {
        Table createTable = createTable("write_partition_evolution_add", SCHEMA, PartitionSpec.unpartitioned());
        int specId = createTable.spec().specId();
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, new Object[0]);
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).commit();
        createTable.updateSpec().addField("data").commit();
        int specId2 = createTable.spec().specId();
        DataFile dataFile2 = dataFile(createTable, "a");
        DataFile dataFile3 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile2).appendFile(dataFile3).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile2, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile3 = deleteFile(createTable, dataFile3, "b");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile3.second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_partition_evolution_add" + ".position_deletes";
        CloseableIterable planFiles = ((BatchScan) createMetadataTableInstance.newBatchScan().filter(Expressions.isNull("partition.data"))).planFiles();
        try {
            String uuid = UUID.randomUUID().toString();
            stageTask(createTable, uuid, planFiles);
            Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
            Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
            load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
            commit(createTable, createMetadataTableInstance, uuid, 1);
            if (planFiles != null) {
                planFiles.close();
            }
            GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
            Assertions.assertThat(actual("write_partition_evolution_add", createTable, "partition.data IS NULL", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile.first(), create.copy("data", (Object) null), specId, null));
            UnmodifiableIterator it = ImmutableList.of("a", "b").iterator();
            while (it.hasNext()) {
                CloseableIterable<ScanTask> tasks = tasks(createMetadataTableInstance, "data", (String) it.next());
                try {
                    String uuid2 = UUID.randomUUID().toString();
                    stageTask(createTable, uuid2, tasks);
                    Dataset load2 = spark.read().format("iceberg").option("scan-task-set-id", uuid2).option("file-open-cost", 2147483647L).load(str);
                    Assertions.assertThat(load2.javaRDD().getNumPartitions()).isEqualTo(1);
                    load2.writeTo(str).option("rewritten-file-scan-task-set-id", uuid2).append();
                    commit(createTable, createMetadataTableInstance, uuid2, 1);
                    if (tasks != null) {
                        tasks.close();
                    }
                } catch (Throwable th) {
                    if (tasks != null) {
                        try {
                            tasks.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            Record copy = create.copy("data", "a");
            Record copy2 = create.copy("data", "b");
            StructLikeSet create2 = StructLikeSet.create(TypeUtil.selectNot(createMetadataTableInstance.schema(), ImmutableSet.of(2147483646)).asStruct());
            create2.addAll(expected(createTable, (List) deleteFile2.first(), copy, specId2, null));
            create2.addAll(expected(createTable, (List) deleteFile3.first(), copy2, specId2, null));
            Assertions.assertThat(actual("write_partition_evolution_add", createTable, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(create2);
            dropTable("write_partition_evolution_add");
        } catch (Throwable th3) {
            if (planFiles != null) {
                try {
                    planFiles.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @TestTemplate
    public void testWritePartitionEvolutionDisallowed() throws Exception {
        Table createTable = createTable("write_partition_evolution_write", SCHEMA, PartitionSpec.unpartitioned());
        DataFile dataFile = dataFile(createTable, new Object[0]);
        createTable.newAppend().appendFile(dataFile).commit();
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile(createTable, dataFile, new Object[0]).second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_partition_evolution_write" + ".position_deletes";
        String uuid = UUID.randomUUID().toString();
        CloseableIterable planFiles = createMetadataTableInstance.newBatchScan().planFiles();
        try {
            stageTask(createTable, uuid, planFiles);
            Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
            Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
            createTable.updateSpec().addField("data").commit();
            if (planFiles != null) {
                planFiles.close();
            }
            Assertions.assertThatThrownBy(() -> {
                load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
            }).isInstanceOf(AnalysisException.class).hasMessage("[INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA] Cannot write incompatible data for the table `" + this.catalogName + "`.`default`.`" + "write_partition_evolution_write" + "`.`position_deletes`: Cannot find data for the output column `partition`.");
            dropTable("write_partition_evolution_write");
        } catch (Throwable th) {
            if (planFiles != null) {
                try {
                    planFiles.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    public void testWriteSchemaEvolutionAdd() throws Exception {
        Table createTable = createTable("write_schema_evolution_add", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, "b").second()).commit();
        createTable.updateSchema().addColumn("new_col_1", Types.IntegerType.get()).addColumn("new_col_2", Types.IntegerType.get()).commit();
        DataFile dataFile3 = dataFile(createTable, "c");
        DataFile dataFile4 = dataFile(createTable, "d");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile3, "c");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile4, "d").second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_schema_evolution_add" + ".position_deletes";
        CloseableIterable<ScanTask> tasks = tasks(createMetadataTableInstance, "data", "a");
        try {
            String uuid = UUID.randomUUID().toString();
            stageTask(createTable, uuid, tasks);
            Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
            Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
            load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
            commit(createTable, createMetadataTableInstance, uuid, 1);
            if (tasks != null) {
                tasks.close();
            }
            GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
            Record copy = create.copy("data", "a");
            List<PositionDelete<?>> list = (List) deleteFile.first();
            list.forEach(positionDelete -> {
                GenericRecord genericRecord = (GenericRecord) positionDelete.get(2, GenericRecord.class);
                GenericRecord create2 = GenericRecord.create(createTable.schema().asStruct());
                create2.set(0, genericRecord.get(0));
                create2.set(1, genericRecord.get(1));
                create2.set(2, (Object) null);
                create2.set(3, (Object) null);
                positionDelete.set(2, create2);
            });
            Assertions.assertThat(actual("write_schema_evolution_add", createTable, "partition.data = 'a'", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, list, copy, null));
            tasks = tasks(createMetadataTableInstance, "data", "c");
            try {
                String uuid2 = UUID.randomUUID().toString();
                stageTask(createTable, uuid2, tasks);
                Dataset load2 = spark.read().format("iceberg").option("scan-task-set-id", uuid2).option("file-open-cost", 2147483647L).load(str);
                Assertions.assertThat(load2.javaRDD().getNumPartitions()).isEqualTo(1);
                load2.writeTo(str).option("rewritten-file-scan-task-set-id", uuid2).append();
                commit(createTable, createMetadataTableInstance, uuid2, 1);
                if (tasks != null) {
                    tasks.close();
                }
                Assertions.assertThat(actual("write_schema_evolution_add", createTable, "partition.data = 'c'", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", "c"), null));
                dropTable("write_schema_evolution_add");
            } finally {
            }
        } finally {
        }
    }

    @TestTemplate
    public void testWriteSchemaEvolutionRemove() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()), Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get())});
        Table createTable = createTable("write_schema_evolution_remove", schema, PartitionSpec.builderFor(schema).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        DataFile dataFile2 = dataFile(createTable, "b");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile = deleteFile(createTable, dataFile, "a");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile2, "b").second()).commit();
        createTable.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
        DataFile dataFile3 = dataFile(createTable, "c");
        DataFile dataFile4 = dataFile(createTable, "d");
        createTable.newAppend().appendFile(dataFile).appendFile(dataFile2).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deleteFile2 = deleteFile(createTable, dataFile3, "c");
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile2.second()).addDeletes((DeleteFile) deleteFile(createTable, dataFile4, "d").second()).commit();
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(createTable, MetadataTableType.POSITION_DELETES);
        String str = this.catalogName + ".default." + "write_schema_evolution_remove" + ".position_deletes";
        UnmodifiableIterator it = ImmutableList.of("a", "b", "c", "d").iterator();
        while (it.hasNext()) {
            CloseableIterable<ScanTask> tasks = tasks(createMetadataTableInstance, "data", (String) it.next());
            try {
                String uuid = UUID.randomUUID().toString();
                stageTask(createTable, uuid, tasks);
                Dataset load = spark.read().format("iceberg").option("scan-task-set-id", uuid).option("file-open-cost", 2147483647L).load(str);
                Assertions.assertThat(load.javaRDD().getNumPartitions()).isEqualTo(1);
                load.writeTo(str).option("rewritten-file-scan-task-set-id", uuid).append();
                commit(createTable, createMetadataTableInstance, uuid, 1);
                if (tasks != null) {
                    tasks.close();
                }
            } catch (Throwable th) {
                if (tasks != null) {
                    try {
                        tasks.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        GenericRecord create = GenericRecord.create(Partitioning.partitionType(createTable));
        Record copy = create.copy("data", "a");
        List<PositionDelete<?>> list = (List) deleteFile.first();
        list.forEach(positionDelete -> {
            GenericRecord genericRecord = (GenericRecord) positionDelete.get(2, GenericRecord.class);
            GenericRecord create2 = GenericRecord.create(createTable.schema().asStruct());
            create2.set(0, genericRecord.get(0));
            create2.set(1, genericRecord.get(1));
            positionDelete.set(2, create2);
        });
        Assertions.assertThat(actual("write_schema_evolution_remove", createTable, "partition.data = 'a'", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, list, copy, null));
        Assertions.assertThat(actual("write_schema_evolution_remove", createTable, "partition.data = 'c'", NON_PATH_COLS)).as("Position Delete table should contain expected rows", new Object[0]).isEqualTo(expected(createTable, (List) deleteFile2.first(), create.copy("data", "c"), null));
        dropTable("write_schema_evolution_remove");
    }

    @TestTemplate
    public void testNormalWritesNotAllowed() throws IOException {
        Table createTable = createTable("test_normal_write_not_allowed", SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        DataFile dataFile = dataFile(createTable, "a");
        createTable.newAppend().appendFile(dataFile).commit();
        createTable.newRowDelta().addDeletes((DeleteFile) deleteFile(createTable, dataFile, "a").second()).commit();
        String str = this.catalogName + ".default." + "test_normal_write_not_allowed" + ".position_deletes";
        Dataset load = spark.read().format("iceberg").load(str);
        Assertions.assertThatThrownBy(() -> {
            load.writeTo(str).append();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Can only write to " + str + " via actions");
        dropTable("test_normal_write_not_allowed");
    }

    private StructLikeSet actual(String str, Table table) {
        return actual(str, table, null, null);
    }

    private StructLikeSet actual(String str, Table table, String str2) {
        return actual(str, table, str2, null);
    }

    private StructLikeSet actual(String str, Table table, String str2, List<String> list) {
        Dataset load = spark.read().format("iceberg").load(this.catalogName + ".default." + str + ".position_deletes");
        if (str2 != null) {
            load = load.filter(str2);
        }
        if (list != null) {
            load = load.select(list.get(0), (String[]) list.subList(1, list.size()).toArray(new String[0]));
        }
        Types.StructType asStruct = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES).schema().asStruct();
        if (list != null) {
            asStruct = Types.StructType.of((List) asStruct.fields().stream().filter(nestedField -> {
                return list.contains(nestedField.name());
            }).collect(Collectors.toList()));
        }
        Types.StructType structType = asStruct;
        StructLikeSet create = StructLikeSet.create(asStruct);
        load.collectAsList().forEach(row -> {
            create.add(new SparkStructLike(structType).wrap(row));
        });
        return create;
    }

    protected Table createTable(String str, Schema schema, PartitionSpec partitionSpec) {
        return this.validationCatalog.createTable(TableIdentifier.of(new String[]{"default", str}), schema, partitionSpec, ImmutableMap.of("format-version", "2", "write.format.default", this.format.toString()));
    }

    protected void dropTable(String str) {
        this.validationCatalog.dropTable(TableIdentifier.of(new String[]{"default", str}), false);
    }

    private PositionDelete<GenericRecord> positionDelete(CharSequence charSequence, Long l) {
        PositionDelete<GenericRecord> create = PositionDelete.create();
        create.set(charSequence, l.longValue(), (Object) null);
        return create;
    }

    private PositionDelete<GenericRecord> positionDelete(Schema schema, CharSequence charSequence, Long l, Object... objArr) {
        PositionDelete<GenericRecord> create = PositionDelete.create();
        GenericRecord create2 = GenericRecord.create(schema);
        for (int i = 0; i < objArr.length; i++) {
            create2.set(i, objArr[i]);
        }
        create.set(charSequence, l.longValue(), create2);
        return create;
    }

    private StructLikeSet expected(Table table, List<PositionDelete<?>> list, StructLike structLike, int i, String str) {
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
        Types.StructType asStruct = createMetadataTableInstance.schema().asStruct();
        if (str == null) {
            asStruct = TypeUtil.selectNot(createMetadataTableInstance.schema(), ImmutableSet.of(2147483646)).asStruct();
        }
        Types.StructType structType = asStruct;
        StructLikeSet create = StructLikeSet.create(asStruct);
        Stream<R> map = list.stream().map(positionDelete -> {
            GenericRecord create2 = GenericRecord.create(structType);
            create2.setField("file_path", positionDelete.path());
            create2.setField("pos", Long.valueOf(positionDelete.pos()));
            create2.setField("row", positionDelete.row());
            if (structLike != null) {
                create2.setField("partition", structLike);
            }
            create2.setField("spec_id", Integer.valueOf(i));
            if (str != null) {
                create2.setField("delete_file_path", str);
            }
            return create2;
        });
        Objects.requireNonNull(create);
        map.forEach((v1) -> {
            r1.add(v1);
        });
        return create;
    }

    private StructLikeSet expected(Table table, List<PositionDelete<?>> list, StructLike structLike, String str) {
        return expected(table, list, structLike, table.spec().specId(), str);
    }

    private DataFile dataFile(Table table, Object... objArr) throws IOException {
        return dataFile(table, objArr, objArr);
    }

    private DataFile dataFile(Table table, Object[] objArr, Object[] objArr2) throws IOException {
        GenericRecord create = GenericRecord.create(table.schema());
        List list = (List) table.spec().fields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        int indexOf = list.indexOf("id");
        int indexOf2 = list.indexOf("data");
        Integer num = indexOf != -1 ? (Integer) objArr[indexOf] : null;
        String str = indexOf2 != -1 ? (String) objArr[indexOf2] : null;
        Record[] recordArr = new Record[4];
        recordArr[0] = create.copy("id", Integer.valueOf(num != null ? num.intValue() : 29), "data", str != null ? str : "c");
        recordArr[1] = create.copy("id", Integer.valueOf(num != null ? num.intValue() : 43), "data", str != null ? str : "k");
        recordArr[2] = create.copy("id", Integer.valueOf(num != null ? num.intValue() : 61), "data", str != null ? str : "r");
        recordArr[3] = create.copy("id", Integer.valueOf(num != null ? num.intValue() : 89), "data", str != null ? str : "t");
        ArrayList newArrayList = Lists.newArrayList(recordArr);
        List columns = table.schema().columns();
        if (columns.size() > 2) {
            for (int i = 2; i < columns.size(); i++) {
                int i2 = i;
                newArrayList.forEach(record -> {
                    record.set(i2, Integer.valueOf(i2));
                });
            }
        }
        return FileHelpers.writeDataFile(table, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(objArr2), newArrayList);
    }

    private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(Table table, DataFile dataFile, Object... objArr) throws IOException {
        return deleteFile(table, dataFile, objArr, objArr);
    }

    private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(Table table, DataFile dataFile, Object[] objArr, Object[] objArr2) throws IOException {
        List list = (List) table.spec().fields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        int indexOf = list.indexOf("id");
        int indexOf2 = list.indexOf("data");
        Integer num = indexOf != -1 ? (Integer) objArr[indexOf] : null;
        String str = indexOf2 != -1 ? (String) objArr[indexOf2] : null;
        PositionDelete[] positionDeleteArr = new PositionDelete[2];
        Schema schema = table.schema();
        CharSequence path = dataFile.path();
        Object[] objArr3 = new Object[2];
        objArr3[0] = Integer.valueOf(num != null ? num.intValue() : 29);
        objArr3[1] = str != null ? str : "c";
        positionDeleteArr[0] = positionDelete(schema, path, 0L, objArr3);
        Schema schema2 = table.schema();
        CharSequence path2 = dataFile.path();
        Object[] objArr4 = new Object[2];
        objArr4[0] = Integer.valueOf(num != null ? num.intValue() : 61);
        objArr4[1] = str != null ? str : "r";
        positionDeleteArr[1] = positionDelete(schema2, path2, 1L, objArr4);
        ArrayList newArrayList = Lists.newArrayList(positionDeleteArr);
        List columns = table.schema().columns();
        if (columns.size() > 2) {
            for (int i = 2; i < columns.size(); i++) {
                int i2 = i;
                newArrayList.forEach(positionDelete -> {
                    ((GenericRecord) positionDelete.get(2, GenericRecord.class)).set(i2, Integer.valueOf(i2));
                });
            }
        }
        return Pair.of(newArrayList, FileHelpers.writePosDeleteFile(table, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), TestHelpers.Row.of(objArr2), newArrayList));
    }

    private <T extends ScanTask> void stageTask(Table table, String str, CloseableIterable<T> closeableIterable) {
        ScanTaskSetManager.get().stageTasks(table, str, Lists.newArrayList(closeableIterable));
    }

    private void commit(Table table, Table table2, String str, int i, int i2) {
        PositionDeletesRewriteCoordinator positionDeletesRewriteCoordinator = PositionDeletesRewriteCoordinator.get();
        Set set = (Set) ScanTaskSetManager.get().fetchTasks(table2, str).stream().map(scanTask -> {
            return ((PositionDeletesScanTask) scanTask).file();
        }).collect(Collectors.toCollection(DeleteFileSet::create));
        Set fetchNewFiles = positionDeletesRewriteCoordinator.fetchNewFiles(table2, str);
        Assertions.assertThat(set).hasSize(i);
        Assertions.assertThat(fetchNewFiles).hasSize(i2);
        Assertions.assertThat((List) set.stream().map(deleteFile -> {
            return deleteFile.path().toString();
        }).sorted().collect(Collectors.toList())).as("Lists should not be the same", new Object[0]).isNotEqualTo((List) fetchNewFiles.stream().map(deleteFile2 -> {
            return deleteFile2.path().toString();
        }).sorted().collect(Collectors.toList()));
        table.newRewrite().rewriteFiles(ImmutableSet.of(), set, ImmutableSet.of(), fetchNewFiles).commit();
    }

    private void commit(Table table, Table table2, String str, int i) {
        commit(table, table2, str, i, i);
    }

    private CloseableIterable<ScanTask> tasks(Table table, String str, String str2) {
        return CloseableIterable.filter(((BatchScan) table.newBatchScan().filter(Expressions.equal("partition." + str, str2))).planFiles(), scanTask -> {
            String str3 = (String) ((PositionDeletesScanTask) scanTask).partition().get(0, String.class);
            return str3 != null && str3.equals(str2);
        });
    }
}
