package org.apache.iceberg.spark.extensions;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.actions.RewritePositionDeleteFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFiles.class */
public class TestRewritePositionDeleteFiles extends SparkExtensionsTestBase {
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false");
    private static final String PARTITION_COL = "partition_col";
    private static final int NUM_DATA_FILES = 5;
    private static final int ROWS_PER_DATA_FILE = 100;
    private static final int DELETE_FILES_PER_PARTITION = 2;
    private static final int DELETE_FILE_SIZE = 10;

    @Rule
    public TemporaryFolder temp;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS}};
    }

    public TestRewritePositionDeleteFiles(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
        this.temp = new TemporaryFolder();
    }

    @After
    public void cleanup() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @Test
    public void testDatePartition() throws Exception {
        createTable("date");
        Date valueOf = Date.valueOf("2023-01-01");
        insertData(num -> {
            return Date.valueOf(valueOf.toLocalDate().plusDays(num.intValue()));
        });
        testDanglingDelete();
    }

    @Test
    public void testBooleanPartition() throws Exception {
        createTable("boolean");
        insertData(num -> {
            return Boolean.valueOf(num.intValue() % DELETE_FILES_PER_PARTITION == 0);
        }, DELETE_FILES_PER_PARTITION);
        testDanglingDelete(DELETE_FILES_PER_PARTITION);
    }

    @Test
    public void testTimestampPartition() throws Exception {
        createTable("timestamp");
        Timestamp valueOf = Timestamp.valueOf("2023-01-01 15:30:00");
        insertData(num -> {
            return Timestamp.valueOf(valueOf.toLocalDateTime().plusDays(num.intValue()));
        });
        testDanglingDelete();
    }

    @Test
    public void testTimestampNtz() throws Exception {
        createTable("timestamp_ntz");
        LocalDateTime localDateTime = Timestamp.valueOf("2023-01-01 15:30:00").toLocalDateTime();
        Objects.requireNonNull(localDateTime);
        insertData((v1) -> {
            return r1.plusDays(v1);
        });
        testDanglingDelete();
    }

    @Test
    public void testBytePartition() throws Exception {
        createTable("byte");
        insertData(num -> {
            return num;
        });
        testDanglingDelete();
    }

    @Test
    public void testDecimalPartition() throws Exception {
        createTable("decimal(18, 10)");
        BigDecimal bigDecimal = new BigDecimal("1.0");
        insertData(num -> {
            return bigDecimal.add(new BigDecimal(num.intValue()));
        });
        testDanglingDelete();
    }

    @Test
    public void testBinaryPartition() throws Exception {
        createTable("binary");
        insertData(num -> {
            return ByteBuffer.allocate(4).putInt(num.intValue()).array();
        });
        testDanglingDelete();
    }

    @Test
    public void testCharPartition() throws Exception {
        createTable("char(10)");
        insertData((v0) -> {
            return v0.toString();
        });
        testDanglingDelete();
    }

    @Test
    public void testVarcharPartition() throws Exception {
        createTable("varchar(10)");
        insertData((v0) -> {
            return v0.toString();
        });
        testDanglingDelete();
    }

    @Test
    public void testIntPartition() throws Exception {
        createTable("int");
        insertData(num -> {
            return num;
        });
        testDanglingDelete();
    }

    @Test
    public void testDaysPartitionTransform() throws Exception {
        createTable("timestamp", PARTITION_COL, String.format("days(%s)", PARTITION_COL));
        Timestamp valueOf = Timestamp.valueOf("2023-01-01 15:30:00");
        insertData(num -> {
            return Timestamp.valueOf(valueOf.toLocalDateTime().plusDays(num.intValue()));
        });
        testDanglingDelete();
    }

    @Test
    public void testNullTransform() throws Exception {
        createTable("int");
        insertData(num -> {
            return num.intValue() == 0 ? null : 1;
        }, DELETE_FILES_PER_PARTITION);
        testDanglingDelete(DELETE_FILES_PER_PARTITION);
    }

    @Test
    public void testPartitionColWithDot() throws Exception {
        createTable("int", "`partition.col`", "`partition.col`");
        insertData("`partition.col`", num -> {
            return num;
        }, NUM_DATA_FILES);
        testDanglingDelete("`partition.col`", NUM_DATA_FILES);
    }

    private void testDanglingDelete() throws Exception {
        testDanglingDelete(NUM_DATA_FILES);
    }

    private void testDanglingDelete(int i) throws Exception {
        testDanglingDelete(PARTITION_COL, i);
    }

    private void testDanglingDelete(String str, int i) throws Exception {
        Table loadIcebergTable = Spark3Util.loadIcebergTable(spark, this.tableName);
        List<DataFile> dataFiles = dataFiles(loadIcebergTable);
        Assertions.assertThat(dataFiles).hasSize(i);
        ((RewriteDataFilesSparkAction) SparkActions.get(spark).rewriteDataFiles(loadIcebergTable).option("rewrite-all", "true")).execute();
        writePosDeletesForFiles(loadIcebergTable, dataFiles);
        List<DeleteFile> deleteFiles = deleteFiles(loadIcebergTable);
        Assertions.assertThat(deleteFiles).hasSize(i * DELETE_FILES_PER_PARTITION);
        List<Object[]> records = records(this.tableName, str);
        RewritePositionDeleteFiles.Result execute = ((RewritePositionDeleteFilesSparkAction) SparkActions.get(spark).rewritePositionDeletes(loadIcebergTable).option("rewrite-all", "true")).execute();
        Assertions.assertThat(deleteFiles(loadIcebergTable)).as("Remaining dangling deletes", new Object[0]).isEmpty();
        checkResult(execute, deleteFiles, Lists.newArrayList(), i);
        assertEquals("Rows must match", records, records(this.tableName, str));
    }

    private void createTable(String str) {
        createTable(str, PARTITION_COL, PARTITION_COL);
    }

    private void createTable(String str, String str2, String str3) {
        sql("CREATE TABLE %s (id long, %s %s, c1 string, c2 string) USING iceberg PARTITIONED BY (%s) TBLPROPERTIES('format-version'='2')", new Object[]{this.tableName, str2, str, str3});
    }

    private void insertData(Function<Integer, ?> function) throws Exception {
        insertData(function, NUM_DATA_FILES);
    }

    private void insertData(Function<Integer, ?> function, int i) throws Exception {
        insertData(PARTITION_COL, function, i);
    }

    private void insertData(String str, Function<Integer, ?> function, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            appendAsFile(spark.range(0L, 100L).withColumn(str, functions.lit(function.apply(Integer.valueOf(i2)))).withColumn("c1", functions.expr("CAST(id AS STRING)")).withColumn("c2", functions.expr("CAST(id AS STRING)")));
        }
    }

    private void appendAsFile(Dataset<Row> dataset) throws Exception {
        spark.createDataFrame(dataset.rdd(), spark.table(this.tableName).schema()).coalesce(1).writeTo(this.tableName).append();
    }

    private void writePosDeletesForFiles(Table table, List<DataFile> list) throws IOException {
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.partition();
        }));
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(DELETE_FILES_PER_PARTITION * map.size());
        for (Map.Entry entry : map.entrySet()) {
            StructLike structLike = (StructLike) entry.getKey();
            List<DataFile> list2 = (List) entry.getValue();
            int size = list2.size() * DELETE_FILE_SIZE;
            Assertions.assertThat(size % DELETE_FILE_SIZE).as("Number of delete files per partition modulo number of data files in this partition", new Object[0]).isEqualTo(0);
            int i = size / DELETE_FILES_PER_PARTITION;
            int i2 = 0;
            ArrayList newArrayList = Lists.newArrayList();
            for (DataFile dataFile : list2) {
                for (int i3 = 0; i3 < DELETE_FILE_SIZE; i3++) {
                    newArrayList.add(Pair.of(dataFile.location(), Long.valueOf(i3)));
                    i2++;
                    if (i2 == i) {
                        newArrayListWithCapacity.add(writeDeleteFile(table, Files.localOutput(this.temp.newFile()), structLike, newArrayList));
                        i2 = 0;
                        newArrayList.clear();
                    }
                }
            }
        }
        RowDelta newRowDelta = table.newRowDelta();
        Objects.requireNonNull(newRowDelta);
        newArrayListWithCapacity.forEach(newRowDelta::addDeletes);
        newRowDelta.commit();
    }

    private DeleteFile writeDeleteFile(Table table, OutputFile outputFile, StructLike structLike, List<Pair<CharSequence, Long>> list) throws IOException {
        PositionDeleteWriter newPosDeleteWriter = new GenericAppenderFactory(table.schema(), table.spec()).newPosDeleteWriter(encrypt(outputFile), defaultFormat(table.properties()), structLike);
        PositionDelete create = PositionDelete.create();
        try {
            for (Pair<CharSequence, Long> pair : list) {
                newPosDeleteWriter.write(create.set((CharSequence) pair.first(), ((Long) pair.second()).longValue(), (Object) null));
            }
            if (newPosDeleteWriter != null) {
                newPosDeleteWriter.close();
            }
            return newPosDeleteWriter.toDeleteFile();
        } catch (Throwable th) {
            if (newPosDeleteWriter != null) {
                try {
                    newPosDeleteWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static EncryptedOutputFile encrypt(OutputFile outputFile) {
        return EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY);
    }

    private static FileFormat defaultFormat(Map<String, String> map) {
        return FileFormat.fromString(map.getOrDefault("write.format.default", "parquet"));
    }

    private List<Object[]> records(String str, String str2) {
        return rowsToJava(spark.read().format("iceberg").load(str).sort(str2, new String[]{"id"}).collectAsList());
    }

    private long size(List<DeleteFile> list) {
        return list.stream().mapToLong((v0) -> {
            return v0.fileSizeInBytes();
        }).sum();
    }

    private List<DataFile> dataFiles(Table table) {
        return Lists.newArrayList(CloseableIterable.transform(((TableScan) table.newScan().includeColumnStats()).planFiles(), (v0) -> {
            return v0.file();
        }));
    }

    private List<DeleteFile> deleteFiles(Table table) {
        return Lists.newArrayList(CloseableIterable.transform(MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES).newBatchScan().planFiles(), scanTask -> {
            return ((PositionDeletesScanTask) scanTask).file();
        }));
    }

    private void checkResult(RewritePositionDeleteFiles.Result result, List<DeleteFile> list, List<DeleteFile> list2, int i) {
        Assertions.assertThat(result.rewrittenDeleteFilesCount()).as("Rewritten delete files", new Object[0]).isEqualTo(list.size());
        Assertions.assertThat(result.addedDeleteFilesCount()).as("Added delete files", new Object[0]).isEqualTo(list2.size());
        Assertions.assertThat(result.rewrittenBytesCount()).as("Rewritten delete bytes", new Object[0]).isEqualTo(size(list));
        Assertions.assertThat(result.addedBytesCount()).as("New Delete byte count", new Object[0]).isEqualTo(size(list2));
        Assertions.assertThat(result.rewriteResults()).as("Rewritten group count", new Object[0]).hasSize(i);
        Assertions.assertThat(result.rewriteResults().stream().mapToInt((v0) -> {
            return v0.rewrittenDeleteFilesCount();
        }).sum()).as("Rewritten delete file count in all groups", new Object[0]).isEqualTo(list.size());
        Assertions.assertThat(result.rewriteResults().stream().mapToInt((v0) -> {
            return v0.addedDeleteFilesCount();
        }).sum()).as("Added delete file count in all groups", new Object[0]).isEqualTo(list2.size());
        Assertions.assertThat(result.rewriteResults().stream().mapToLong((v0) -> {
            return v0.rewrittenBytesCount();
        }).sum()).as("Rewritten delete bytes in all groups", new Object[0]).isEqualTo(size(list));
        Assertions.assertThat(result.rewriteResults().stream().mapToLong((v0) -> {
            return v0.addedBytesCount();
        }).sum()).as("Added delete bytes in all groups", new Object[0]).isEqualTo(size(list2));
    }
}
