package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestForwardCompatibility.class */
public class TestForwardCompatibility {

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});
    private static final PartitionSpec UNKNOWN_SPEC = TestHelpers.newExpectedSpecBuilder().withSchema(SCHEMA).withSpecId(0).addField("zero", 1, "id_zero").build();
    private static final PartitionSpec FAKE_SPEC = TestHelpers.newExpectedSpecBuilder().withSchema(SCHEMA).withSpecId(0).addField("identity", 1, "id_zero").build();
    private static SparkSession spark = null;

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession sparkSession = spark;
        spark = null;
        sparkSession.stop();
    }

    @Test
    public void testSparkWriteFailsUnknownTransform() throws IOException {
        File file = new File(this.temp.newFolder("avro"), "test");
        new File(file, "data").mkdirs();
        new HadoopTables(CONF).create(SCHEMA, UNKNOWN_SPEC, file.toString());
        Dataset createDataFrame = spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")}), SimpleRecord.class);
        Assertions.assertThatThrownBy(() -> {
            createDataFrame.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file.toString());
        }).as("Should reject write with unsupported transform", new Object[0]).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Cannot write using unsupported transforms: zero");
    }

    @Test
    public void testSparkStreamingWriteFailsUnknownTransform() throws IOException, TimeoutException {
        File newFolder = this.temp.newFolder("avro");
        File file = new File(newFolder, "test");
        new File(file, "data").mkdirs();
        File file2 = new File(newFolder, "checkpoint");
        file2.mkdirs();
        new HadoopTables(CONF).create(SCHEMA, UNKNOWN_SPEC, file.toString());
        MemoryStream newMemoryStream = newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        StreamingQuery start = newMemoryStream.toDF().selectExpr(new String[]{"value AS id", "CAST (value AS STRING) AS data"}).writeStream().outputMode("append").format("iceberg").option("checkpointLocation", file2.toString()).option("path", file.toString()).start();
        send(Lists.newArrayList(new Integer[]{1, 2}), newMemoryStream);
        Objects.requireNonNull(start);
        Assertions.assertThatThrownBy(start::processAllAvailable).as("Should reject streaming write with unsupported transform", new Object[0]).isInstanceOf(StreamingQueryException.class).hasMessageContaining("Cannot write using unsupported transforms: zero");
    }

    @Test
    public void testSparkCanReadUnknownTransform() throws IOException {
        File file = new File(this.temp.newFolder("avro"), "test");
        File file2 = new File(file, "data");
        file2.mkdirs();
        Table create = new HadoopTables(CONF).create(SCHEMA, UNKNOWN_SPEC, file.toString());
        create.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        List<GenericData.Record> generateList = RandomData.generateList(create.schema(), 100, 1L);
        File file3 = new File(file2, FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()));
        FileAppender build = Parquet.write(Files.localOutput(file3)).schema(create.schema()).build();
        try {
            build.addAll(generateList);
            build.close();
            DataFile build2 = DataFiles.builder(FAKE_SPEC).withInputFile(Files.localInput(file3)).withMetrics(build.metrics()).withPartitionPath("id_zero=0").build();
            ManifestWriter write = ManifestFiles.write(FAKE_SPEC, Files.localOutput(FileFormat.AVRO.addExtension(this.temp.newFile().toString())));
            try {
                write.add(build2);
                write.close();
                create.newFastAppend().appendManifest(write.toManifestFile()).commit();
                List collectAsList = spark.read().format("iceberg").load(file.toString()).collectAsList();
                Assert.assertEquals("Should contain 100 rows", 100L, collectAsList.size());
                for (int i = 0; i < generateList.size(); i++) {
                    org.apache.iceberg.spark.data.TestHelpers.assertEqualsSafe(create.schema().asStruct(), generateList.get(i), (Row) collectAsList.get(i));
                }
            } catch (Throwable th) {
                write.close();
                throw th;
            }
        } catch (Throwable th2) {
            build.close();
            throw th2;
        }
    }

    private <T> MemoryStream<T> newMemoryStream(int i, SQLContext sQLContext, Encoder<T> encoder) {
        return new MemoryStream<>(i, sQLContext, Option.empty(), encoder);
    }

    private <T> void send(List<T> list, MemoryStream<T> memoryStream) {
        memoryStream.addData(JavaConverters.asScalaBuffer(list));
    }
}
