package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestDataSourceOptions.class */
public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});
    private static SparkSession spark = null;

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession sparkSession = spark;
        spark = null;
        sparkSession.stop();
    }

    @Test
    public void testWriteFormatOptionOverridesTableProperties() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        HadoopTables hadoopTables = new HadoopTables(CONF);
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.format.default", "avro");
        Table create = hadoopTables.create(SCHEMA, unpartitioned, newHashMap, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(file);
        CloseableIterable planFiles = create.newScan().planFiles();
        Throwable th = null;
        try {
            try {
                planFiles.forEach(fileScanTask -> {
                    Assert.assertEquals(FileFormat.PARQUET, FileFormat.fromFileName(fileScanTask.file().path()));
                });
                if (planFiles != null) {
                    $closeResource(null, planFiles);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (planFiles != null) {
                $closeResource(th, planFiles);
            }
            throw th3;
        }
    }

    @Test
    public void testNoWriteFormatOption() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        HadoopTables hadoopTables = new HadoopTables(CONF);
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.format.default", "avro");
        Table create = hadoopTables.create(SCHEMA, unpartitioned, newHashMap, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        CloseableIterable planFiles = create.newScan().planFiles();
        Throwable th = null;
        try {
            try {
                planFiles.forEach(fileScanTask -> {
                    Assert.assertEquals(FileFormat.AVRO, FileFormat.fromFileName(fileScanTask.file().path()));
                });
                if (planFiles != null) {
                    $closeResource(null, planFiles);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (planFiles != null) {
                $closeResource(th, planFiles);
            }
            throw th3;
        }
    }

    @Test
    public void testHadoopOptions() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        Configuration newHadoopConf = spark.sessionState().newHadoopConf();
        String str = newHadoopConf.get("fs.default.name");
        try {
            new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
            newHadoopConf.set("fs.default.name", "hdfs://localhost:9000");
            ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
            spark.createDataFrame(newArrayList, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").option("hadoop.fs.default.name", "file:///").save(file);
            Assert.assertEquals("Records should match", newArrayList, spark.read().format("iceberg").option("hadoop.fs.default.name", "file:///").load(file).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList());
            newHadoopConf.set("fs.default.name", str);
        } catch (Throwable th) {
            newHadoopConf.set("fs.default.name", str);
            throw th;
        }
    }

    @Test
    public void testSplitOptionsOverridesTableProperties() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        HadoopTables hadoopTables = new HadoopTables(CONF);
        PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("read.split.target-size", String.valueOf(134217728L));
        newHashMap.put("write.format.default", String.valueOf(FileFormat.AVRO));
        Table create = hadoopTables.create(SCHEMA, unpartitioned, newHashMap, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("id", new String[]{"data"}).repartition(1).write().format("iceberg").mode("append").save(file);
        ArrayList newArrayList = Lists.newArrayList(create.currentSnapshot().addedDataFiles(create.io()));
        Assert.assertEquals("Should have written 1 file", 1L, newArrayList.size());
        Assert.assertEquals("Spark partitions should match", 2L, spark.read().format("iceberg").option("split-size", String.valueOf(LongMath.divide(((DataFile) newArrayList.get(0)).fileSizeInBytes(), 2L, RoundingMode.CEILING))).load(file).javaRDD().getNumPartitions());
    }

    @Test
    public void testIncrementalScanOptions() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "d")});
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{(SimpleRecord) it.next()}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        }
        List currentAncestorIds = SnapshotUtil.currentAncestorIds(create);
        Assertions.assertThatThrownBy(() -> {
            spark.read().format("iceberg").option("snapshot-id", ((Long) currentAncestorIds.get(3)).toString()).option("start-snapshot-id", ((Long) currentAncestorIds.get(3)).toString()).load(file).explain();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot set start-snapshot-id and end-snapshot-id for incremental scans when either snapshot-id or as-of-timestamp is set");
        Assertions.assertThatThrownBy(() -> {
            spark.read().format("iceberg").option("as-of-timestamp", Long.toString(create.snapshot(((Long) currentAncestorIds.get(3)).longValue()).timestampMillis())).option("end-snapshot-id", ((Long) currentAncestorIds.get(2)).toString()).load(file).explain();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot set start-snapshot-id and end-snapshot-id for incremental scans when either snapshot-id or as-of-timestamp is set");
        Assertions.assertThatThrownBy(() -> {
            spark.read().format("iceberg").option("end-snapshot-id", ((Long) currentAncestorIds.get(2)).toString()).load(file).explain();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot set only end-snapshot-id for incremental scans. Please, set start-snapshot-id too.");
        Assert.assertEquals("Records should match", newArrayList.subList(1, 4), spark.read().format("iceberg").option("start-snapshot-id", ((Long) currentAncestorIds.get(3)).toString()).load(file).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList());
        Dataset load = spark.read().format("iceberg").option("start-snapshot-id", ((Long) currentAncestorIds.get(2)).toString()).option("end-snapshot-id", ((Long) currentAncestorIds.get(1)).toString()).load(file);
        Assert.assertEquals("Records should match", newArrayList.subList(2, 3), load.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList());
        Assert.assertEquals("Unprocessed count should match record count", 1L, load.count());
    }

    @Test
    public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        List allManifests = create.currentSnapshot().allManifests(create.io());
        Assert.assertEquals("Must be 2 manifests", 2L, allManifests.size());
        create.updateProperties().set("read.split.metadata-target-size", String.valueOf(((ManifestFile) allManifests.get(0)).length())).commit();
        Assert.assertEquals("Num partitions must match", 2L, spark.read().format("iceberg").load(file + "#entries").javaRDD().getNumPartitions());
        Assert.assertEquals("Num partitions must match", 1L, spark.read().format("iceberg").option("split-size", String.valueOf(134217728)).load(file + "#entries").javaRDD().getNumPartitions());
    }

    @Test
    public void testDefaultMetadataSplitSize() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        Assert.assertEquals("Spark partitions should match", ((((int) ((ManifestFile) r0.load(file + "#entries").currentSnapshot().allManifests(create.io()).get(0)).length()) + 33554432) - 1) / 33554432, spark.read().format("iceberg").load(file + "#entries").javaRDD().getNumPartitions());
    }

    @Test
    public void testExtraSnapshotMetadata() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        HadoopTables hadoopTables = new HadoopTables(CONF);
        hadoopTables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").option("snapshot-property.extra-key", "someValue").option("snapshot-property.another-key", "anotherValue").save(file);
        Table load = hadoopTables.load(file);
        Assert.assertTrue(((String) load.currentSnapshot().summary().get("extra-key")).equals("someValue"));
        Assert.assertTrue(((String) load.currentSnapshot().summary().get("another-key")).equals("anotherValue"));
    }

    @Test
    public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        Table create = new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        spark.read().format("iceberg").load(file).createOrReplaceTempView("target");
        Thread thread = new Thread(() -> {
            CommitMetadata.withCommitProperties(ImmutableMap.of("writer-thread", String.valueOf(Thread.currentThread().getName()), "snapshot-property.extra-key", "someValue", "snapshot-property.another-key", "anotherValue"), () -> {
                spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
                return 0;
            }, RuntimeException.class);
        });
        thread.setName("test-extra-commit-message-writer-thread");
        thread.start();
        thread.join();
        ArrayList newArrayList = Lists.newArrayList(create.snapshots());
        Assert.assertEquals(2L, newArrayList.size());
        Assert.assertNull(((Snapshot) newArrayList.get(0)).summary().get("writer-thread"));
        Assertions.assertThat(((Snapshot) newArrayList.get(1)).summary()).containsEntry("writer-thread", "test-extra-commit-message-writer-thread").containsEntry("extra-key", "someValue").containsEntry("another-key", "anotherValue");
    }

    @Test
    public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, NoSuchTableException {
        spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class).repartition(5, new Column[]{new Column("data")}).select("id", new String[]{"data"}).writeTo(this.tableName).append();
        Thread thread = new Thread(() -> {
            CommitMetadata.withCommitProperties(ImmutableMap.of("writer-thread", String.valueOf(Thread.currentThread().getName()), "snapshot-property.extra-key", "someValue", "snapshot-property.another-key", "anotherValue"), () -> {
                spark.sql("DELETE FROM " + this.tableName + " where id = 1");
                return 0;
            }, RuntimeException.class);
        });
        thread.setName("test-extra-commit-message-delete-thread");
        thread.start();
        thread.join();
        ArrayList newArrayList = Lists.newArrayList(this.validationCatalog.loadTable(this.tableIdent).snapshots());
        Assert.assertEquals(2L, newArrayList.size());
        Assert.assertNull(((Snapshot) newArrayList.get(0)).summary().get("writer-thread"));
        Assertions.assertThat(((Snapshot) newArrayList.get(1)).summary()).containsEntry("writer-thread", "test-extra-commit-message-delete-thread").containsEntry("extra-key", "someValue").containsEntry("another-key", "anotherValue");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
