package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/spark/source/TestSparkDataWrite.class */
public class TestSparkDataWrite {
    private final FileFormat format;

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private static final Configuration CONF = new Configuration();
    private static SparkSession spark = null;
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});

    /* loaded from: input_file:org/apache/iceberg/spark/source/TestSparkDataWrite$IcebergOptionsType.class */
    public enum IcebergOptionsType {
        NONE,
        TABLE,
        JOB
    }

    @Parameterized.Parameters(name = "format = {0}")
    public static Object[] parameters() {
        return new Object[]{"parquet", "avro", "orc"};
    }

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @Parameterized.AfterParam
    public static void clearSourceCache() {
        ManualSource.clearTables();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession sparkSession = spark;
        spark = null;
        sparkSession.stop();
    }

    public TestSparkDataWrite(String str) {
        this.format = FileFormat.valueOf(str.toUpperCase(Locale.ENGLISH));
    }

    @Test
    public void testBasicWrite() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        spark.createDataFrame(newArrayList, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
        Iterator it = create.currentSnapshot().allManifests().iterator();
        while (it.hasNext()) {
            CloseableIterator it2 = ManifestFiles.read((ManifestFile) it.next(), create.io()).iterator();
            while (it2.hasNext()) {
                DataFile dataFile = (DataFile) it2.next();
                if (!this.format.equals(FileFormat.AVRO)) {
                    Assert.assertNotNull("Split offsets not present", dataFile.splitOffsets());
                }
                Assert.assertEquals("Should have reported record count as 1", 1L, dataFile.recordCount());
                if (this.format.equals(FileFormat.PARQUET)) {
                    Assert.assertNotNull("Column sizes metric not present", dataFile.columnSizes());
                    Assert.assertNotNull("Counts metric not present", dataFile.valueCounts());
                    Assert.assertNotNull("Null value counts metric not present", dataFile.nullValueCounts());
                    Assert.assertNotNull("Lower bounds metric not present", dataFile.lowerBounds());
                    Assert.assertNotNull("Upper bounds metric not present", dataFile.upperBounds());
                }
            }
        }
    }

    @Test
    public void testAppend() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "a"), new SimpleRecord(5, "b"), new SimpleRecord(6, "c")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        createDataFrame.withColumn("id", createDataFrame.col("id").plus(3)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList2.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList2, collectAsList);
    }

    @Test
    public void testEmptyOverwrite() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        spark.createDataFrame(newArrayList, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        spark.createDataFrame(ImmutableList.of(), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
    }

    @Test
    public void testOverwrite() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "a"), new SimpleRecord(3, "c"), new SimpleRecord(4, "b"), new SimpleRecord(6, "c")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        createDataFrame.withColumn("id", createDataFrame.col("id").multiply(2)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList2.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList2, collectAsList);
    }

    @Test
    public void testUnpartitionedOverwrite() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
    }

    @Test
    public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), file.toString());
        create.updateProperties().set("write.target-file-size-bytes", "4").commit();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(4000);
        for (int i = 0; i < 4000; i++) {
            newArrayListWithCapacity.add(new SimpleRecord(Integer.valueOf(i), "a"));
        }
        spark.createDataFrame(newArrayListWithCapacity, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayListWithCapacity.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayListWithCapacity, collectAsList);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = create.currentSnapshot().allManifests().iterator();
        while (it.hasNext()) {
            CloseableIterator it2 = ManifestFiles.read((ManifestFile) it.next(), create.io()).iterator();
            while (it2.hasNext()) {
                newArrayList.add((DataFile) it2.next());
            }
        }
        if (this.format.equals(FileFormat.ORC)) {
            return;
        }
        Assert.assertEquals("Should have 4 DataFiles", 4L, newArrayList.size());
        Assert.assertTrue("All DataFiles contain 1000 rows", newArrayList.stream().allMatch(dataFile -> {
            return dataFile.recordCount() == 1000;
        }));
    }

    @Test
    public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
        partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.NONE);
    }

    @Test
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException {
        partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.TABLE);
    }

    @Test
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption2() throws IOException {
        partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.JOB);
    }

    @Test
    public void testWriteProjection() throws IOException {
        Assume.assumeTrue("Not supported in Spark 3.0; analysis requires all columns are present", spark.version().startsWith("2"));
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)});
        spark.createDataFrame(newArrayList, SimpleRecord.class).select("id", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
    }

    @Test
    public void testWriteProjectionWithMiddle() throws IOException {
        Assume.assumeTrue("Not supported in Spark 3.0; analysis requires all columns are present", spark.version().startsWith("2"));
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())}), PartitionSpec.unpartitioned(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "hello"), new ThreeColumnRecord(2, null, "world"), new ThreeColumnRecord(3, null, null)});
        spark.createDataFrame(newArrayList, ThreeColumnRecord.class).select("c1", new String[]{"c3"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("c1", new String[0]).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
    }

    @Test
    public void testViewsReturnRecentResults() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build(), file.toString());
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        spark.read().format("iceberg").load(file.toString()).where("id = 1").createOrReplaceTempView("tmp");
        List collectAsList = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")});
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(file.toString());
        List collectAsList2 = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(1, "a")});
        Assert.assertEquals("Number of rows should match", newArrayList2.size(), collectAsList2.size());
        Assert.assertEquals("Result rows should match", newArrayList2, collectAsList2);
    }

    public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType icebergOptionsType) throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "test");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build(), file.toString());
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(8000);
        for (int i = 0; i < 2000; i++) {
            newArrayListWithCapacity.add(new SimpleRecord(Integer.valueOf(i), "a"));
            newArrayListWithCapacity.add(new SimpleRecord(Integer.valueOf(i), "b"));
            newArrayListWithCapacity.add(new SimpleRecord(Integer.valueOf(i), "c"));
            newArrayListWithCapacity.add(new SimpleRecord(Integer.valueOf(i), "d"));
        }
        Dataset createDataFrame = spark.createDataFrame(newArrayListWithCapacity, SimpleRecord.class);
        switch (icebergOptionsType) {
            case NONE:
                createDataFrame.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(file.toString());
                break;
            case TABLE:
                create.updateProperties().set("write.spark.fanout.enabled", "true").commit();
                createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(file.toString());
                break;
            case JOB:
                createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).option("fanout-enabled", true).save(file.toString());
                break;
        }
        create.refresh();
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayListWithCapacity.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayListWithCapacity, collectAsList);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = create.currentSnapshot().allManifests().iterator();
        while (it.hasNext()) {
            CloseableIterator it2 = ManifestFiles.read((ManifestFile) it.next(), create.io()).iterator();
            while (it2.hasNext()) {
                newArrayList.add((DataFile) it2.next());
            }
        }
        if (this.format.equals(FileFormat.ORC)) {
            return;
        }
        Assert.assertEquals("Should have 8 DataFiles", 8L, newArrayList.size());
        Assert.assertTrue("All DataFiles contain 1000 rows", newArrayList.stream().allMatch(dataFile -> {
            return dataFile.recordCount() == 1000;
        }));
    }

    @Test
    public void testCommitUnknownException() throws IOException {
        File file = new File(this.temp.newFolder(this.format.toString()), "commitunknown");
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build(), file.toString());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset createDataFrame = spark.createDataFrame(newArrayList, SimpleRecord.class);
        AppendFiles newFastAppend = create.newFastAppend();
        AppendFiles appendFiles = (AppendFiles) Mockito.spy(newFastAppend);
        ((AppendFiles) Mockito.doAnswer(invocationOnMock -> {
            newFastAppend.commit();
            throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
        }).when(appendFiles)).commit();
        Table table = (Table) Mockito.spy(create);
        Mockito.when(table.newAppend()).thenReturn(appendFiles);
        SparkTable sparkTable = new SparkTable(table, false);
        String str = "unknown_exception";
        ManualSource.setTable("unknown_exception", sparkTable);
        AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception", SparkException.class, "Writing job aborted", CommitStateUnknownException.class, "Datacenter on Fire", () -> {
            createDataFrame.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("org.apache.iceberg.spark.source.ManualSource").option(ManualSource.TABLE_NAME, str).mode(SaveMode.Append).save(file.toString());
        });
        List collectAsList = spark.read().format("iceberg").load(file.toString()).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals("Number of rows should match", newArrayList.size(), collectAsList.size());
        Assert.assertEquals("Result rows should match", newArrayList, collectAsList);
    }
}
