package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.FourColumnRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.class */
public class TestRewritePositionDeleteFilesAction extends CatalogTestBase {
    private static final String TABLE_NAME = "test_table";
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false");
    private static final int SCALE = 4000;
    private static final int DELETES_SCALE = 1000;

    @TempDir
    private Path temp;

    @Parameter(index = 3)
    private FileFormat format;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, fileFormat = {3}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.PARQUET}};
    }

    @AfterEach
    public void cleanup() {
        this.validationCatalog.dropTable(TableIdentifier.of(new String[]{"default", TABLE_NAME}));
    }

    @TestTemplate
    public void testEmptyTable() {
        RewritePositionDeleteFiles.Result execute = SparkActions.get(spark).rewritePositionDeletes(this.validationCatalog.createTable(TableIdentifier.of(new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("c1").build(), tableProperties())).execute();
        Assertions.assertThat(execute.rewrittenDeleteFilesCount()).as("No rewritten delete files", new Object[0]).isZero();
        Assertions.assertThat(execute.addedDeleteFilesCount()).as("No added delete files", new Object[0]).isZero();
    }

    @TestTemplate
    public void testFileGranularity() throws Exception {
        checkDeleteGranularity(DeleteGranularity.FILE);
    }

    @TestTemplate
    public void testPartitionGranularity() throws Exception {
        checkDeleteGranularity(DeleteGranularity.PARTITION);
    }

    private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception {
        Table createTableUnpartitioned = createTableUnpartitioned(2, SCALE);
        createTableUnpartitioned.updateProperties().set("write.delete.granularity", deleteGranularity.toString()).commit();
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTableUnpartitioned);
        Assertions.assertThat(dataFiles).hasSize(2);
        writePosDeletesForFiles(createTableUnpartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(deleteFiles(createTableUnpartitioned)).hasSize(2);
        Assertions.assertThat(((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTableUnpartitioned).option("rewrite-all", "true")).execute().addedDeleteFilesCount()).isEqualTo(deleteGranularity == DeleteGranularity.FILE ? 2 : 1);
    }

    @TestTemplate
    public void testUnpartitioned() throws Exception {
        Table createTableUnpartitioned = createTableUnpartitioned(2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTableUnpartitioned);
        writePosDeletesForFiles(createTableUnpartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = deleteFiles(createTableUnpartitioned);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> records = records(createTableUnpartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTableUnpartitioned);
        Assertions.assertThat(records).hasSize(2000);
        Assertions.assertThat(deleteRecords).hasSize(2000);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTableUnpartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTableUnpartitioned);
        Assertions.assertThat(deleteFiles2).as("Expected 1 new delete file", new Object[0]).hasSize(1);
        assertLocallySorted(deleteFiles2);
        assertNotContains(deleteFiles, deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 1);
        checkSequenceNumbers(createTableUnpartitioned, deleteFiles, deleteFiles2);
        List<Object[]> records2 = records(createTableUnpartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTableUnpartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    @TestTemplate
    public void testRewriteAll() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(9223372036854775806L))).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).hasSize(4);
        assertNotContains(deleteFiles, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 4);
        checkSequenceNumbers(createTablePartitioned, deleteFiles, deleteFiles2);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    @TestTemplate
    public void testRewriteFilter() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        createTablePartitioned.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        createTablePartitioned.refresh();
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).filter(Expressions.and(Expressions.greaterThan("c3", "0"), Expressions.or(Expressions.equal("c1", 1), Expressions.equal("c1", 2)))).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(9223372036854775806L))).execute();
        List<DeleteFile> except = except(deleteFiles(createTablePartitioned), deleteFiles);
        Assertions.assertThat(except).as("Should have 4 delete files", new Object[0]).hasSize(2);
        List<DeleteFile> filterFiles = filterFiles(createTablePartitioned, deleteFiles, ImmutableList.of(1), ImmutableList.of(2));
        assertLocallySorted(except);
        checkResult(execute, filterFiles, except, 2);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    @TestTemplate
    public void testRewriteToSmallerTarget() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).option("target-file-size-bytes", String.valueOf((size(deleteFiles) / deleteFiles.size()) / 2))).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).as("Should have 8 new delete files", new Object[0]).hasSize(8);
        assertNotContains(deleteFiles, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 4);
        checkSequenceNumbers(createTablePartitioned, deleteFiles, deleteFiles2);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    @TestTemplate
    public void testRemoveDanglingDeletes() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles, true);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        ((RewriteDataFilesSparkAction) SparkActions.get(spark).rewriteDataFiles(createTablePartitioned).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).as("Should have 0 new delete files", new Object[0]).hasSize(0);
        assertNotContains(deleteFiles, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 4);
        checkSequenceNumbers(createTablePartitioned, deleteFiles, deleteFiles2);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        Assertions.assertThat(deleteRecords2).as("Should be no new position deletes", new Object[0]).hasSize(0);
    }

    @TestTemplate
    public void testSomePartitionsDanglingDeletes() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        ((RewriteDataFilesSparkAction) SparkActions.get(spark).rewriteDataFiles(createTablePartitioned).filter(Expressions.or(Expressions.equal("c1", 0), Expressions.equal("c1", 1))).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).as("Should have 2 new delete files", new Object[0]).hasSize(2);
        assertNotContains(deleteFiles, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 4);
        checkSequenceNumbers(createTablePartitioned, deleteFiles, deleteFiles2);
        List<Object[]> list = (List) deleteRecords.stream().filter(objArr -> {
            Object[] objArr = (Object[]) objArr[3];
            return objArr[0] == 2 || objArr[0] == 3;
        }).collect(Collectors.toList());
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", list, deleteRecords2);
    }

    @TestTemplate
    public void testRewriteFilterRemoveDangling() throws Exception {
        Table createTablePartitioned = createTablePartitioned(4, 2, SCALE);
        createTablePartitioned.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles, true);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(8);
        createTablePartitioned.refresh();
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(records).hasSize(12000);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        ((RewriteDataFilesSparkAction) SparkActions.get(spark).rewriteDataFiles(createTablePartitioned).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).filter(Expressions.or(Expressions.equal("c1", 0), Expressions.equal("c1", 1))).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(9223372036854775806L))).execute();
        List<DeleteFile> except = except(deleteFiles(createTablePartitioned), deleteFiles);
        Assertions.assertThat(except).as("Should have 2 new delete files", new Object[0]).hasSize(0);
        checkResult(execute, filterFiles(createTablePartitioned, deleteFiles, ImmutableList.of(0), ImmutableList.of(1)), except, 2);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        List<Object[]> filterDeletes = filterDeletes(deleteRecords, ImmutableList.of(2), ImmutableList.of(3));
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", filterDeletes, deleteRecords2);
    }

    @TestTemplate
    public void testPartitionEvolutionAdd() throws Exception {
        Table createTableUnpartitioned = createTableUnpartitioned(2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTableUnpartitioned);
        writePosDeletesForFiles(createTableUnpartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = deleteFiles(createTableUnpartitioned);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> deleteRecords = deleteRecords(createTableUnpartitioned);
        Assertions.assertThat(records(createTableUnpartitioned)).hasSize(2000);
        Assertions.assertThat(deleteRecords).hasSize(2000);
        createTableUnpartitioned.updateSpec().addField("c1").commit();
        writeRecords(createTableUnpartitioned, 2, SCALE, 2);
        List<DataFile> except = except(TestHelpers.dataFiles(createTableUnpartitioned), dataFiles);
        writePosDeletesForFiles(createTableUnpartitioned, 2, DELETES_SCALE, except);
        Assertions.assertThat(except).hasSize(2);
        List except2 = except(deleteFiles(createTableUnpartitioned), deleteFiles);
        Assertions.assertThat(except2).hasSize(4);
        List<Object[]> deleteRecords2 = deleteRecords(createTableUnpartitioned);
        List<Object[]> records = records(createTableUnpartitioned);
        Assertions.assertThat(deleteRecords2).hasSize(SCALE);
        Assertions.assertThat(records).hasSize(8000);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTableUnpartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> list = (List) Stream.concat(deleteFiles.stream(), except2.stream()).collect(Collectors.toList());
        List<DeleteFile> deleteFiles2 = deleteFiles(createTableUnpartitioned);
        Assertions.assertThat(deleteFiles2).as("Should have 3 new delete files", new Object[0]).hasSize(3);
        assertNotContains(list, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, list, deleteFiles2, 3);
        checkSequenceNumbers(createTableUnpartitioned, list, deleteFiles2);
        List<Object[]> records2 = records(createTableUnpartitioned);
        List<Object[]> deleteRecords3 = deleteRecords(createTableUnpartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords2, deleteRecords3);
    }

    @TestTemplate
    public void testPartitionEvolutionRemove() throws Exception {
        Table createTablePartitioned = createTablePartitioned(2, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(4);
        createTablePartitioned.updateSpec().removeField("c1").commit();
        writeRecords(createTablePartitioned, 2, SCALE);
        List<DataFile> except = except(TestHelpers.dataFiles(createTablePartitioned), dataFiles);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, except);
        Assertions.assertThat(except).hasSize(2);
        Assertions.assertThat(except(deleteFiles(createTablePartitioned), deleteFiles)).hasSize(2);
        List<Object[]> records = records(createTablePartitioned);
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        Assertions.assertThat(records).hasSize(8000);
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).hasSize(6);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> deleteFiles3 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles3).as("Should have 3 new delete files", new Object[0]).hasSize(3);
        assertNotContains(deleteFiles2, deleteFiles3);
        assertLocallySorted(deleteFiles3);
        checkResult(execute, deleteFiles2, deleteFiles3, 3);
        checkSequenceNumbers(createTablePartitioned, deleteFiles2, deleteFiles3);
        List<Object[]> records2 = records(createTablePartitioned);
        List<Object[]> deleteRecords2 = deleteRecords(createTablePartitioned);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    @TestTemplate
    public void testSchemaEvolution() throws Exception {
        Table createTablePartitioned = createTablePartitioned(2, 2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTablePartitioned);
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles).hasSize(4);
        createTablePartitioned.updateSchema().addColumn("c4", Types.StringType.get()).commit();
        writeNewSchemaRecords(createTablePartitioned, 2, SCALE, 2, 2);
        int fieldId = createTablePartitioned.schema().findField("c4").fieldId();
        writePosDeletesForFiles(createTablePartitioned, 2, DELETES_SCALE, (List) TestHelpers.dataFiles(createTablePartitioned).stream().filter(dataFile -> {
            return dataFile.upperBounds().containsKey(Integer.valueOf(fieldId));
        }).collect(Collectors.toList()));
        List except = except(deleteFiles(createTablePartitioned), deleteFiles);
        Assertions.assertThat(except).hasSize(4);
        createTablePartitioned.refresh();
        List<Object[]> deleteRecords = deleteRecords(createTablePartitioned);
        List<Object[]> records = records(createTablePartitioned);
        Assertions.assertThat(deleteRecords).hasSize(SCALE);
        Assertions.assertThat(records).hasSize(12000);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTablePartitioned).option("rewrite-all", "true")).execute();
        List<DeleteFile> list = (List) Stream.concat(deleteFiles.stream(), except.stream()).collect(Collectors.toList());
        List<DeleteFile> deleteFiles2 = deleteFiles(createTablePartitioned);
        Assertions.assertThat(deleteFiles2).as("Should have 2 new delete files", new Object[0]).hasSize(4);
        assertNotContains(list, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, list, deleteFiles2, 4);
        checkSequenceNumbers(createTablePartitioned, list, deleteFiles2);
        assertEquals("Rows must match", records, records(createTablePartitioned));
    }

    @TestTemplate
    public void testSnapshotProperty() throws Exception {
        Table createTableUnpartitioned = createTableUnpartitioned(2, SCALE);
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTableUnpartitioned);
        writePosDeletesForFiles(createTableUnpartitioned, 2, DELETES_SCALE, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        Assertions.assertThat(deleteFiles(createTableUnpartitioned)).hasSize(2);
        ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTableUnpartitioned).snapshotProperty("key", "value")).option("rewrite-all", "true")).execute();
        Assertions.assertThat(createTableUnpartitioned.currentSnapshot().summary()).containsAllEntriesOf(ImmutableMap.of("key", "value"));
        Assertions.assertThat(createTableUnpartitioned.currentSnapshot().summary()).containsKeys(new String[]{"added-delete-files", "added-position-deletes", "changed-partition-count", "removed-delete-files", "removed-position-deletes", "total-data-files", "total-delete-files"});
    }

    @TestTemplate
    public void testRewriteManyColumns() throws Exception {
        ArrayList newArrayList = Lists.newArrayList(new Types.NestedField[]{Types.NestedField.required(0, "id", Types.LongType.get())});
        newArrayList.addAll((List) IntStream.range(1, 1010).mapToObj(i -> {
            return Types.NestedField.optional(i, "c" + i, Types.StringType.get());
        }).collect(Collectors.toList()));
        Schema schema = new Schema(newArrayList);
        Table createTable = this.validationCatalog.createTable(TableIdentifier.of(new String[]{"default", TABLE_NAME}), schema, PartitionSpec.builderFor(schema).bucket("id", 2).build(), tableProperties());
        spark.createDataFrame(spark.range(4L).withColumns((Map) IntStream.range(1, 1010).boxed().collect(Collectors.toMap(num -> {
            return "c" + num;
        }, num2 -> {
            return functions.expr("CAST(id as STRING)");
        }))).rdd(), spark.table(name(createTable)).schema()).coalesce(1).write().format("iceberg").mode("append").save(name(createTable));
        List<DataFile> dataFiles = TestHelpers.dataFiles(createTable);
        writePosDeletesForFiles(createTable, 1, 1, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = deleteFiles(createTable);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> records = records(createTable);
        List<Object[]> deleteRecords = deleteRecords(createTable);
        Assertions.assertThat(records).hasSize(2);
        Assertions.assertThat(deleteRecords).hasSize(2);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(createTable).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(9223372036854775806L))).execute();
        List<DeleteFile> deleteFiles2 = deleteFiles(createTable);
        Assertions.assertThat(deleteFiles2).hasSize(2);
        assertNotContains(deleteFiles, deleteFiles2);
        assertLocallySorted(deleteFiles2);
        checkResult(execute, deleteFiles, deleteFiles2, 2);
        checkSequenceNumbers(createTable, deleteFiles, deleteFiles2);
        List<Object[]> records2 = records(createTable);
        List<Object[]> deleteRecords2 = deleteRecords(createTable);
        assertEquals("Rows must match", records, records2);
        assertEquals("Position deletes must match", deleteRecords, deleteRecords2);
    }

    private Table createTablePartitioned(int i, int i2, int i3) {
        Table createTable = this.validationCatalog.createTable(TableIdentifier.of(new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("c1").build(), tableProperties());
        writeRecords(createTable, i2, i3, i);
        return createTable;
    }

    private Table createTableUnpartitioned(int i, int i2) {
        Table createTable = this.validationCatalog.createTable(TableIdentifier.of(new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.unpartitioned(), tableProperties());
        writeRecords(createTable, i, i2);
        return createTable;
    }

    private Map<String, String> tableProperties() {
        return ImmutableMap.of("write.metadata.metrics.default", "full", "format-version", "2", "write.format.default", this.format.toString());
    }

    private void writeRecords(Table table, int i, int i2) {
        writeRecords(table, i, i2, 1);
    }

    private void writeRecords(Table table, int i, int i2, int i3) {
        writeRecordsWithPartitions(table, i, i2, (List) IntStream.range(0, i3).mapToObj((v0) -> {
            return ImmutableList.of(v0);
        }).collect(Collectors.toList()));
    }

    private void writeRecordsWithPartitions(Table table, int i, int i2, List<List<Integer>> list) {
        int size = table.spec().partitionType().fields().size();
        Assertions.assertThat(size).as("This method currently supports only two columns as partition columns", new Object[0]).isLessThanOrEqualTo(2);
        BiFunction biFunction = (num, list2) -> {
            switch (size) {
                case 0:
                    return new ThreeColumnRecord(num, String.valueOf(num), String.valueOf(num));
                case 1:
                    return new ThreeColumnRecord((Integer) list2.get(0), String.valueOf(num), String.valueOf(num));
                case 2:
                    return new ThreeColumnRecord((Integer) list2.get(0), String.valueOf(list2.get(1)), String.valueOf(num));
                default:
                    throw new ValidationException("This method currently supports only two columns as partition columns", new Object[0]);
            }
        };
        spark.createDataFrame((List) list.stream().flatMap(list3 -> {
            return IntStream.range(0, i2).mapToObj(i3 -> {
                return (ThreeColumnRecord) biFunction.apply(Integer.valueOf(i3), list3);
            });
        }).collect(Collectors.toList()), ThreeColumnRecord.class).repartition(i).write().format("iceberg").mode("append").save(name(table));
        table.refresh();
    }

    private void writeNewSchemaRecords(Table table, int i, int i2, int i3, int i4) {
        spark.createDataFrame((List) IntStream.range(i3, i3 + i4).boxed().flatMap(num -> {
            return IntStream.range(0, i2).mapToObj(i5 -> {
                return new FourColumnRecord(num, String.valueOf(i5), String.valueOf(i5), String.valueOf(i5));
            });
        }).collect(Collectors.toList()), FourColumnRecord.class).repartition(i).write().format("iceberg").mode("append").save(name(table));
    }

    private List<Object[]> records(Table table) {
        return rowsToJava(spark.read().format("iceberg").load(name(table)).sort("c1", new String[]{"c2", "c3"}).collectAsList());
    }

    private List<Object[]> deleteRecords(Table table) {
        return rowsToJava(spark.read().format("iceberg").load(name(table) + ".position_deletes").select("file_path", table.spec().isUnpartitioned() ? new String[]{"pos", "row"} : new String[]{"pos", "row", "partition", "spec_id"}).sort("file_path", new String[]{"pos"}).collectAsList());
    }

    private void writePosDeletesForFiles(Table table, int i, int i2, List<DataFile> list) throws IOException {
        writePosDeletesForFiles(table, i, i2, list, false);
    }

    private void writePosDeletesForFiles(Table table, int i, int i2, List<DataFile> list, boolean z) throws IOException {
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.partition();
        }));
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i * map.size());
        for (Map.Entry entry : map.entrySet()) {
            StructLike structLike = (StructLike) entry.getKey();
            List<DataFile> list2 = (List) entry.getValue();
            int size = list2.size() * i2;
            Assertions.assertThat(size % i).as("Number of delete files per partition should be evenly divisible by requested deletes per data file times number of data files in this partition", new Object[0]).isZero();
            int i3 = size / i;
            int i4 = 0;
            ArrayList newArrayList = Lists.newArrayList();
            for (DataFile dataFile : list2) {
                for (int i5 = 0; i5 < i2; i5++) {
                    newArrayList.add(Pair.of(dataFile.path(), Long.valueOf(i5)));
                    i4++;
                    if (i4 == i3) {
                        newArrayListWithCapacity.add((DeleteFile) FileHelpers.writeDeleteFile(table, Files.localOutput(File.createTempFile("junit", null, this.temp.toFile())), structLike, newArrayList).first());
                        i4 = 0;
                        newArrayList.clear();
                    }
                }
            }
        }
        if (!z) {
            newArrayListWithCapacity.forEach(deleteFile -> {
                RowDelta newRowDelta = table.newRowDelta();
                newRowDelta.addDeletes(deleteFile);
                newRowDelta.commit();
            });
            return;
        }
        RowDelta newRowDelta = table.newRowDelta();
        Objects.requireNonNull(newRowDelta);
        newArrayListWithCapacity.forEach(newRowDelta::addDeletes);
        newRowDelta.commit();
    }

    private List<DeleteFile> deleteFiles(Table table) {
        return Lists.newArrayList(CloseableIterable.transform(MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES).newBatchScan().planFiles(), scanTask -> {
            return ((PositionDeletesScanTask) scanTask).file();
        }));
    }

    private <T extends ContentFile<?>> List<T> except(List<T> list, List<T> list2) {
        Set set = (Set) list2.stream().map(contentFile -> {
            return contentFile.path().toString();
        }).collect(Collectors.toSet());
        return (List) list.stream().filter(contentFile2 -> {
            return !set.contains(contentFile2.path().toString());
        }).collect(Collectors.toList());
    }

    private void assertNotContains(List<DeleteFile> list, List<DeleteFile> list2) {
        Set set = (Set) list.stream().map(deleteFile -> {
            return deleteFile.path().toString();
        }).collect(Collectors.toSet());
        Set set2 = (Set) list2.stream().map(deleteFile2 -> {
            return deleteFile2.path().toString();
        }).collect(Collectors.toSet());
        set2.retainAll(set);
        Assertions.assertThat(set2).hasSize(0);
    }

    private void assertLocallySorted(List<DeleteFile> list) {
        for (DeleteFile deleteFile : list) {
            Dataset load = spark.read().format("iceberg").load("default.test_table.position_deletes");
            load.filter(load.col("delete_file_path").equalTo(deleteFile.path().toString()));
            List<Row> collectAsList = load.collectAsList();
            Assertions.assertThat(collectAsList).as("Empty delete file found", new Object[0]).isNotEmpty();
            for (Row row : collectAsList) {
                String str = (String) row.getAs("file_path");
                long longValue = ((Long) row.getAs("pos")).longValue();
                if (str.compareTo("") < 0) {
                    Assertions.fail(String.format("File_path not sorted, Found %s after %s", str, ""));
                } else if (str.equals("")) {
                    Assertions.assertThat(longValue).as("Pos not sorted", new Object[0]).isGreaterThanOrEqualTo(0);
                }
            }
        }
    }

    private String name(Table table) {
        String[] split = table.name().split("\\.");
        Assertions.assertThat(split).hasSize(3);
        return String.format("%s.%s", split[1], split[2]);
    }

    private long size(List<DeleteFile> list) {
        return list.stream().mapToLong((v0) -> {
            return v0.fileSizeInBytes();
        }).sum();
    }

    private List<Object[]> filterDeletes(List<Object[]> list, List<?>... listArr) {
        return (List) sorted(list.stream().filter(objArr -> {
            Object[] objArr = (Object[]) objArr[3];
            return ((Boolean) Arrays.stream(listArr).map(list2 -> {
                return Boolean.valueOf(match(objArr, list2));
            }).reduce((bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            }).get()).booleanValue();
        })).collect(Collectors.toList());
    }

    private boolean match(Object[] objArr, List<?> list) {
        return ((Boolean) IntStream.range(0, list.size()).mapToObj(i -> {
            return Boolean.valueOf(objArr[i] == list.get(i));
        }).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).get()).booleanValue();
    }

    private Stream<Object[]> sorted(Stream<Object[]> stream) {
        return stream.sorted((objArr, objArr2) -> {
            int compareTo = ((String) objArr[0]).compareTo((String) objArr2[0]);
            return compareTo != 0 ? compareTo : Long.compare(((Long) objArr[1]).longValue(), ((Long) objArr2[1]).longValue());
        });
    }

    private List<DeleteFile> filterFiles(Table table, List<DeleteFile> list, List<?>... listArr) {
        List list2 = (List) table.specs().values().stream().map((v0) -> {
            return v0.partitionType();
        }).collect(Collectors.toList());
        List list3 = (List) Arrays.stream(listArr).map(list4 -> {
            PartitionData partitionData = new PartitionData((Types.StructType) list2.stream().filter(structType -> {
                return structType.fields().size() == list4.size();
            }).findFirst().get());
            for (int i = 0; i < list4.size(); i++) {
                partitionData.set(i, list4.get(i));
            }
            return partitionData;
        }).collect(Collectors.toList());
        return (List) list.stream().filter(deleteFile -> {
            return list3.stream().anyMatch(partitionData -> {
                return deleteFile.partition().equals(partitionData);
            });
        }).collect(Collectors.toList());
    }

    private void checkResult(RewritePositionDeleteFiles.Result result, List<DeleteFile> list, List<DeleteFile> list2, int i) {
        Assertions.assertThat(list.size()).as("Expected rewritten delete file count does not match", new Object[0]).isEqualTo(result.rewrittenDeleteFilesCount());
        Assertions.assertThat(list2.size()).as("Expected new delete file count does not match", new Object[0]).isEqualTo(result.addedDeleteFilesCount());
        Assertions.assertThat(size(list)).as("Expected rewritten delete byte count does not match", new Object[0]).isEqualTo(result.rewrittenBytesCount());
        Assertions.assertThat(size(list2)).as("Expected new delete byte count does not match", new Object[0]).isEqualTo(result.addedBytesCount());
        Assertions.assertThat(i).as("Expected rewrite group count does not match", new Object[0]).isEqualTo(result.rewriteResults().size());
        Assertions.assertThat(list.size()).as("Expected rewritten delete file count in all groups to match", new Object[0]).isEqualTo(result.rewriteResults().stream().mapToInt((v0) -> {
            return v0.rewrittenDeleteFilesCount();
        }).sum());
        Assertions.assertThat(list2.size()).as("Expected added delete file count in all groups to match", new Object[0]).isEqualTo(result.rewriteResults().stream().mapToInt((v0) -> {
            return v0.addedDeleteFilesCount();
        }).sum());
        Assertions.assertThat(size(list)).as("Expected rewritten delete bytes in all groups to match", new Object[0]).isEqualTo(result.rewriteResults().stream().mapToLong((v0) -> {
            return v0.rewrittenBytesCount();
        }).sum());
        Assertions.assertThat(size(list2)).as("Expected added delete bytes in all groups to match", new Object[0]).isEqualTo(result.rewriteResults().stream().mapToLong((v0) -> {
            return v0.addedBytesCount();
        }).sum());
    }

    private void checkSequenceNumbers(Table table, List<DeleteFile> list, List<DeleteFile> list2) {
        StructLikeMap<List<DeleteFile>> groupPerPartition = groupPerPartition(table, list);
        StructLikeMap<List<DeleteFile>> groupPerPartition2 = groupPerPartition(table, list2);
        for (StructLike structLike : groupPerPartition.keySet()) {
            Long valueOf = Long.valueOf(((List) groupPerPartition.get(structLike)).stream().mapToLong((v0) -> {
                return v0.dataSequenceNumber();
            }).max().getAsLong());
            List list3 = (List) groupPerPartition2.get(structLike);
            if (list3 != null) {
                list3.forEach(deleteFile -> {
                    Assertions.assertThat(deleteFile.dataSequenceNumber()).as("Sequence number should be max of rewritten set", new Object[0]).isEqualTo(valueOf);
                });
            }
        }
    }

    private StructLikeMap<List<DeleteFile>> groupPerPartition(Table table, List<DeleteFile> list) {
        StructLikeMap<List<DeleteFile>> create = StructLikeMap.create(Partitioning.partitionType(table));
        for (DeleteFile deleteFile : list) {
            StructLike partition = deleteFile.partition();
            List list2 = (List) create.get(partition);
            if (list2 == null) {
                list2 = Lists.newArrayList();
            }
            list2.add(deleteFile);
            create.put(partition, list2);
        }
        return create;
    }
}
