package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.ValidationHelpers;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/spark/actions/TestRewriteManifestsAction.class */
public class TestRewriteManifestsAction extends SparkTestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    private final String snapshotIdInheritanceEnabled;

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private String tableLocation = null;

    @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
    public static Object[] parameters() {
        return new Object[]{"true", "false"};
    }

    public TestRewriteManifestsAction(String str) {
        this.snapshotIdInheritanceEnabled = str;
    }

    @Before
    public void setupTableLocation() throws Exception {
        this.tableLocation = this.temp.newFolder().toURI().toString();
    }

    @Test
    public void testRewriteManifestsEmptyTable() throws IOException {
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, unpartitioned, newHashMap, this.tableLocation);
        Assert.assertNull("Table must be empty", create.currentSnapshot());
        SparkActions.get().rewriteManifests(create).rewriteIf(manifestFile -> {
            return true;
        }).stagingLocation(this.temp.newFolder().toString()).execute();
        Assert.assertNull("Table must stay empty", create.currentSnapshot());
    }

    @Test
    public void testRewriteSmallManifestsNonPartitionedTable() {
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, unpartitioned, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")});
        writeRecords(newArrayList);
        ArrayList newArrayList2 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")});
        writeRecords(newArrayList2);
        create.refresh();
        Assert.assertEquals("Should have 2 manifests before rewrite", 2L, create.currentSnapshot().allManifests(create.io()).size());
        RewriteManifests.Result execute = SparkActions.get().rewriteManifests(create).rewriteIf(manifestFile -> {
            return true;
        }).execute();
        Assert.assertEquals("Action should rewrite 2 manifests", 2L, Iterables.size(execute.rewrittenManifests()));
        Assert.assertEquals("Action should add 1 manifests", 1L, Iterables.size(execute.addedManifests()));
        create.refresh();
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 1 manifests after rewrite", 1L, allManifests.size());
        Assert.assertEquals(4L, ((ManifestFile) allManifests.get(0)).existingFilesCount().intValue());
        Assert.assertFalse(((ManifestFile) allManifests.get(0)).hasAddedFiles());
        Assert.assertFalse(((ManifestFile) allManifests.get(0)).hasDeletedFiles());
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList2);
        Assert.assertEquals("Rows must match", newArrayList3, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testRewriteManifestsWithCommitStateUnknownException() {
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, unpartitioned, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")});
        writeRecords(newArrayList);
        ArrayList newArrayList2 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")});
        writeRecords(newArrayList2);
        create.refresh();
        Assert.assertEquals("Should have 2 manifests before rewrite", 2L, create.currentSnapshot().allManifests(create.io()).size());
        SparkActions sparkActions = SparkActions.get();
        org.apache.iceberg.RewriteManifests rewriteManifests = create.rewriteManifests();
        org.apache.iceberg.RewriteManifests rewriteManifests2 = (org.apache.iceberg.RewriteManifests) Mockito.spy(rewriteManifests);
        ((org.apache.iceberg.RewriteManifests) Mockito.doAnswer(invocationOnMock -> {
            rewriteManifests.commit();
            throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
        }).when(rewriteManifests2)).commit();
        Table table = (Table) Mockito.spy(create);
        Mockito.when(table.rewriteManifests()).thenReturn(rewriteManifests2);
        AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception", RuntimeException.class, "Datacenter on Fire", () -> {
            sparkActions.rewriteManifests(table).rewriteIf(manifestFile -> {
                return true;
            }).execute();
        });
        create.refresh();
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 1 manifests after rewrite", 1L, allManifests.size());
        Assert.assertEquals(4L, ((ManifestFile) allManifests.get(0)).existingFilesCount().intValue());
        Assert.assertFalse(((ManifestFile) allManifests.get(0)).hasAddedFiles());
        Assert.assertFalse(((ManifestFile) allManifests.get(0)).hasDeletedFiles());
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList2);
        Assert.assertEquals("Rows must match", newArrayList3, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testRewriteSmallManifestsPartitionedTable() {
        PartitionSpec build = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, build, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")});
        writeRecords(newArrayList);
        ArrayList newArrayList2 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")});
        writeRecords(newArrayList2);
        ArrayList newArrayList3 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE"), new ThreeColumnRecord(3, "FFFFFFFFFF", "FFFF")});
        writeRecords(newArrayList3);
        ArrayList newArrayList4 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(4, "GGGGGGGGGG", "GGGG"), new ThreeColumnRecord(4, "HHHHHHHHHG", "HHHH")});
        writeRecords(newArrayList4);
        create.refresh();
        List<ManifestFile> allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 4 manifests before rewrite", 4L, allManifests.size());
        SparkActions sparkActions = SparkActions.get();
        create.updateProperties().set("commit.manifest.target-size-bytes", String.valueOf((long) (4.2d * computeManifestEntrySizeBytes(allManifests)))).commit();
        RewriteManifests.Result execute = sparkActions.rewriteManifests(create).rewriteIf(manifestFile -> {
            return true;
        }).execute();
        Assert.assertEquals("Action should rewrite 4 manifests", 4L, Iterables.size(execute.rewrittenManifests()));
        Assert.assertEquals("Action should add 2 manifests", 2L, Iterables.size(execute.addedManifests()));
        create.refresh();
        List allManifests2 = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 2 manifests after rewrite", 2L, allManifests2.size());
        Assert.assertEquals(4L, ((ManifestFile) allManifests2.get(0)).existingFilesCount().intValue());
        Assert.assertFalse(((ManifestFile) allManifests2.get(0)).hasAddedFiles());
        Assert.assertFalse(((ManifestFile) allManifests2.get(0)).hasDeletedFiles());
        Assert.assertEquals(4L, ((ManifestFile) allManifests2.get(1)).existingFilesCount().intValue());
        Assert.assertFalse(((ManifestFile) allManifests2.get(1)).hasAddedFiles());
        Assert.assertFalse(((ManifestFile) allManifests2.get(1)).hasDeletedFiles());
        ArrayList newArrayList5 = Lists.newArrayList();
        newArrayList5.addAll(newArrayList);
        newArrayList5.addAll(newArrayList2);
        newArrayList5.addAll(newArrayList3);
        newArrayList5.addAll(newArrayList4);
        Assert.assertEquals("Rows must match", newArrayList5, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testRewriteImportedManifests() throws IOException {
        PartitionSpec build = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, build, newHashMap, this.tableLocation);
        try {
            spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")}), ThreeColumnRecord.class).select("c1", new String[]{"c2", "c3"}).write().format("parquet").mode("overwrite").option("path", this.temp.newFolder("parquet_table").toURI().toString()).partitionBy(new String[]{"c3"}).saveAsTable("parquet_table");
            SparkTableUtil.importSparkTable(spark, new TableIdentifier("parquet_table"), create, this.temp.newFolder("staging-dir").toString());
            Assert.assertEquals("Action should rewrite all manifests", create.currentSnapshot().allManifests(create.io()), SparkActions.get().rewriteManifests(create).rewriteIf(manifestFile -> {
                return true;
            }).stagingLocation(this.temp.newFolder().toString()).execute().rewrittenManifests());
            Assert.assertEquals("Action should add 1 manifest", 1L, Iterables.size(r0.addedManifests()));
            spark.sql("DROP TABLE parquet_table");
        } catch (Throwable th) {
            spark.sql("DROP TABLE parquet_table");
            throw th;
        }
    }

    @Test
    public void testRewriteLargeManifestsPartitionedTable() throws IOException {
        PartitionSpec build = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, build, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 50; i++) {
            newArrayList.add(new ThreeColumnRecord(Integer.valueOf(i), String.valueOf(i), "0"));
        }
        Dataset createDataFrame = spark.createDataFrame(newArrayList, ThreeColumnRecord.class);
        writeDF(createDataFrame.repartition(50, new Column[]{createDataFrame.col("c1")}));
        create.refresh();
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 1 manifests before rewrite", 1L, allManifests.size());
        create.updateProperties().set("commit.manifest.target-size-bytes", String.valueOf(((ManifestFile) allManifests.get(0)).length() / 2)).commit();
        RewriteManifests.Result execute = SparkActions.get().rewriteManifests(create).rewriteIf(manifestFile -> {
            return true;
        }).stagingLocation(this.temp.newFolder().toString()).execute();
        Assert.assertEquals("Action should rewrite 1 manifest", 1L, Iterables.size(execute.rewrittenManifests()));
        Assert.assertEquals("Action should add 2 manifests", 2L, Iterables.size(execute.addedManifests()));
        create.refresh();
        Assert.assertEquals("Should have 2 manifests after rewrite", 2L, create.currentSnapshot().allManifests(create.io()).size());
        Assert.assertEquals("Rows must match", newArrayList, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testRewriteManifestsWithPredicate() throws IOException {
        PartitionSpec build = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", this.snapshotIdInheritanceEnabled);
        Table create = TABLES.create(SCHEMA, build, newHashMap, this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")});
        writeRecords(newArrayList);
        ArrayList newArrayList2 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")});
        writeRecords(newArrayList2);
        create.refresh();
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 2 manifests before rewrite", 2L, allManifests.size());
        RewriteManifests.Result execute = ((RewriteManifestsSparkAction) SparkActions.get().rewriteManifests(create).rewriteIf(manifestFile -> {
            return manifestFile.path().equals(((ManifestFile) allManifests.get(0)).path());
        }).stagingLocation(this.temp.newFolder().toString()).option("use-caching", "false")).execute();
        Assert.assertEquals("Action should rewrite 1 manifest", 1L, Iterables.size(execute.rewrittenManifests()));
        Assert.assertEquals("Action should add 1 manifests", 1L, Iterables.size(execute.addedManifests()));
        create.refresh();
        List allManifests2 = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 2 manifests after rewrite", 2L, allManifests2.size());
        Assert.assertFalse("First manifest must be rewritten", allManifests2.contains(allManifests.get(0)));
        Assert.assertTrue("Second manifest must not be rewritten", allManifests2.contains(allManifests.get(1)));
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList2);
        Assert.assertEquals("Rows must match", newArrayList3, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    @Test
    public void testRewriteSmallManifestsNonPartitionedV2Table() {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "2"), this.tableLocation);
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "AAAA")});
        writeRecords(newArrayList);
        create.refresh();
        Snapshot currentSnapshot = create.currentSnapshot();
        ContentFile contentFile = (DataFile) Iterables.getOnlyElement(currentSnapshot.addedDataFiles(create.io()));
        ArrayList newArrayList2 = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "CCCC", "CCCC")});
        writeRecords(newArrayList2);
        create.refresh();
        Snapshot currentSnapshot2 = create.currentSnapshot();
        ContentFile contentFile2 = (DataFile) Iterables.getOnlyElement(currentSnapshot2.addedDataFiles(create.io()));
        Assert.assertEquals("Should have 2 manifests before rewrite", 2L, create.currentSnapshot().allManifests(create.io()).size());
        RewriteManifests.Result execute = SparkActions.get().rewriteManifests(create).execute();
        Assert.assertEquals("Action should rewrite 2 manifests", 2L, Iterables.size(execute.rewrittenManifests()));
        Assert.assertEquals("Action should add 1 manifests", 1L, Iterables.size(execute.addedManifests()));
        create.refresh();
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Should have 1 manifests after rewrite", 1L, allManifests.size());
        ManifestFile manifestFile = (ManifestFile) Iterables.getOnlyElement(allManifests);
        Assert.assertEquals(2L, manifestFile.existingFilesCount().intValue());
        Assert.assertFalse(manifestFile.hasAddedFiles());
        Assert.assertFalse(manifestFile.hasDeletedFiles());
        ValidationHelpers.validateDataManifest(create, manifestFile, ValidationHelpers.dataSeqs(1L, 2L), ValidationHelpers.fileSeqs(1L, 2L), ValidationHelpers.snapshotIds(Long.valueOf(currentSnapshot.snapshotId()), Long.valueOf(currentSnapshot2.snapshotId())), ValidationHelpers.files(contentFile, contentFile2));
        ArrayList newArrayList3 = Lists.newArrayList();
        newArrayList3.addAll(newArrayList);
        newArrayList3.addAll(newArrayList2);
        Assert.assertEquals("Rows must match", newArrayList3, spark.read().format("iceberg").load(this.tableLocation).sort("c1", new String[]{"c2"}).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList());
    }

    private void writeRecords(List<ThreeColumnRecord> list) {
        writeDF(spark.createDataFrame(list, ThreeColumnRecord.class));
    }

    private void writeDF(Dataset<Row> dataset) {
        dataset.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.tableLocation);
    }

    private long computeManifestEntrySizeBytes(List<ManifestFile> list) {
        long j = 0;
        int i = 0;
        for (ManifestFile manifestFile : list) {
            j += manifestFile.length();
            i += manifestFile.addedFilesCount().intValue() + manifestFile.existingFilesCount().intValue() + manifestFile.deletedFilesCount().intValue();
        }
        return j / i;
    }
}
