package org.apache.iceberg.spark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.UnmodifiableIterator;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/TestFileRewriteCoordinator.class */
public class TestFileRewriteCoordinator extends SparkCatalogTestBase {
    public TestFileRewriteCoordinator(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @Test
    public void testBinPackRewrite() throws NoSuchTableException, IOException {
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> newDF = newDF(1000);
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should produce 4 snapshots", 4L, Iterables.size(loadTable.snapshots()));
        long sum = spark.read().format("iceberg").load(tableName(this.tableIdent.name() + ".files")).select("file_size_in_bytes", new String[0]).as(Encoders.LONG()).collectAsList().stream().mapToLong(l -> {
            return l.longValue();
        }).sum() / r0.size();
        CloseableIterable planFiles = loadTable.newScan().planFiles();
        Throwable th = null;
        try {
            try {
                String uuid = UUID.randomUUID().toString();
                FileScanTaskSetManager fileScanTaskSetManager = FileScanTaskSetManager.get();
                fileScanTaskSetManager.stageTasks(loadTable, uuid, Lists.newArrayList(planFiles));
                spark.read().format("iceberg").option("file-scan-task-set-id", uuid).option("split-size", Long.toString(sum * 2)).option("file-open-cost", "0").load(this.tableName).writeTo(this.tableName).option("rewritten-file-scan-task-set-id", uuid).append();
                loadTable.newRewrite().rewriteFiles((Set) fileScanTaskSetManager.fetchTasks(loadTable, uuid).stream().map((v0) -> {
                    return v0.file();
                }).collect(Collectors.toSet()), FileRewriteCoordinator.get().fetchNewDataFiles(loadTable, uuid)).commit();
                if (planFiles != null) {
                    if (0 != 0) {
                        try {
                            planFiles.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        planFiles.close();
                    }
                }
                loadTable.refresh();
                Map summary = loadTable.currentSnapshot().summary();
                Assert.assertEquals("Deleted files count must match", "4", summary.get("deleted-data-files"));
                Assert.assertEquals("Added files count must match", "2", summary.get("added-data-files"));
                Assert.assertEquals("Row count must match", 4000L, scalarSql("SELECT count(*) FROM %s", this.tableName));
            } finally {
            }
        } catch (Throwable th3) {
            if (planFiles != null) {
                if (th != null) {
                    try {
                        planFiles.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    planFiles.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSortRewrite() throws NoSuchTableException, IOException {
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> newDF = newDF(1000);
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should produce 4 snapshots", 4L, Iterables.size(loadTable.snapshots()));
        CloseableIterable planFiles = loadTable.newScan().planFiles();
        Throwable th = null;
        try {
            try {
                String uuid = UUID.randomUUID().toString();
                FileScanTaskSetManager fileScanTaskSetManager = FileScanTaskSetManager.get();
                fileScanTaskSetManager.stageTasks(loadTable, uuid, Lists.newArrayList(planFiles));
                Dataset load = spark.read().format("iceberg").option("file-scan-task-set-id", uuid).option("split-size", "134217728").option("file-open-cost", "134217728").load(this.tableName);
                withSQLConf(ImmutableMap.of("spark.sql.shuffle.partitions", "2", "spark.sql.adaptive.enabled", "false"), () -> {
                    try {
                        load.sort("id", new String[0]).writeTo(this.tableName).option("rewritten-file-scan-task-set-id", uuid).append();
                    } catch (NoSuchTableException e) {
                        throw new RuntimeException("Could not replace files", e);
                    }
                });
                loadTable.newRewrite().rewriteFiles((Set) fileScanTaskSetManager.fetchTasks(loadTable, uuid).stream().map((v0) -> {
                    return v0.file();
                }).collect(Collectors.toSet()), FileRewriteCoordinator.get().fetchNewDataFiles(loadTable, uuid)).commit();
                if (planFiles != null) {
                    if (0 != 0) {
                        try {
                            planFiles.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        planFiles.close();
                    }
                }
                loadTable.refresh();
                Map summary = loadTable.currentSnapshot().summary();
                Assert.assertEquals("Deleted files count must match", "4", summary.get("deleted-data-files"));
                Assert.assertEquals("Added files count must match", "2", summary.get("added-data-files"));
                Assert.assertEquals("Row count must match", 4000L, scalarSql("SELECT count(*) FROM %s", this.tableName));
            } finally {
            }
        } catch (Throwable th3) {
            if (planFiles != null) {
                if (th != null) {
                    try {
                        planFiles.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    planFiles.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCommitMultipleRewrites() throws NoSuchTableException, IOException {
        Throwable th;
        String uuid;
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> newDF = newDF(1000);
        newDF.coalesce(1).writeTo(this.tableName).append();
        newDF.coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        String uuid2 = UUID.randomUUID().toString();
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        FileScanTaskSetManager fileScanTaskSetManager = FileScanTaskSetManager.get();
        CloseableIterable planFiles = loadTable.newScan().planFiles();
        Throwable th2 = null;
        try {
            try {
                fileScanTaskSetManager.stageTasks(loadTable, uuid2, Lists.newArrayList(planFiles));
                if (planFiles != null) {
                    if (0 != 0) {
                        try {
                            planFiles.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        planFiles.close();
                    }
                }
                newDF.coalesce(1).writeTo(this.tableName).append();
                newDF.coalesce(1).writeTo(this.tableName).append();
                loadTable.refresh();
                uuid = UUID.randomUUID().toString();
                planFiles = loadTable.newScan().appendsAfter(snapshotId).planFiles();
                th = null;
            } finally {
            }
            try {
                try {
                    fileScanTaskSetManager.stageTasks(loadTable, uuid, Lists.newArrayList(planFiles));
                    if (planFiles != null) {
                        if (0 != 0) {
                            try {
                                planFiles.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            planFiles.close();
                        }
                    }
                    ImmutableSet of = ImmutableSet.of(uuid2, uuid);
                    UnmodifiableIterator it = of.iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        spark.read().format("iceberg").option("file-scan-task-set-id", str).option("split-size", Long.MAX_VALUE).load(this.tableName).writeTo(this.tableName).option("rewritten-file-scan-task-set-id", str).append();
                    }
                    FileRewriteCoordinator fileRewriteCoordinator = FileRewriteCoordinator.get();
                    loadTable.newRewrite().rewriteFiles((Set) of.stream().flatMap(str2 -> {
                        return fileScanTaskSetManager.fetchTasks(loadTable, str2).stream();
                    }).map((v0) -> {
                        return v0.file();
                    }).collect(Collectors.toSet()), (Set) of.stream().flatMap(str3 -> {
                        return fileRewriteCoordinator.fetchNewDataFiles(loadTable, str3).stream();
                    }).collect(Collectors.toSet())).commit();
                    loadTable.refresh();
                    Assert.assertEquals("Should produce 5 snapshots", 5L, Iterables.size(loadTable.snapshots()));
                    Map summary = loadTable.currentSnapshot().summary();
                    Assert.assertEquals("Deleted files count must match", "4", summary.get("deleted-data-files"));
                    Assert.assertEquals("Added files count must match", "2", summary.get("added-data-files"));
                    Assert.assertEquals("Row count must match", 4000L, scalarSql("SELECT count(*) FROM %s", this.tableName));
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private Dataset<Row> newDF(int i) {
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithExpectedSize.add(new SimpleRecord(Integer.valueOf(i2), Integer.toString(i2)));
        }
        return spark.createDataFrame(newArrayListWithExpectedSize, SimpleRecord.class);
    }
}
