package org.apache.iceberg.spark.source;

import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:org/apache/iceberg/spark/source/IcebergSource.class */
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
    private SparkSession lazySpark = null;
    private Configuration lazyConf = null;

    public String shortName() {
        return "iceberg";
    }

    public DataSourceReader createReader(DataSourceOptions dataSourceOptions) {
        return createReader(null, dataSourceOptions);
    }

    public DataSourceReader createReader(StructType structType, DataSourceOptions dataSourceOptions) {
        Table tableAndResolveHadoopConfiguration = getTableAndResolveHadoopConfiguration(dataSourceOptions, new Configuration(lazyBaseConf()));
        Reader reader = new Reader(lazySparkSession(), tableAndResolveHadoopConfiguration, Boolean.parseBoolean(lazySparkSession().conf().get("spark.sql.caseSensitive")), dataSourceOptions);
        if (structType != null) {
            SparkSchemaUtil.convert(tableAndResolveHadoopConfiguration.schema(), structType);
            reader.pruneColumns(structType);
        }
        return reader;
    }

    public Optional<DataSourceWriter> createWriter(String str, StructType structType, SaveMode saveMode, DataSourceOptions dataSourceOptions) {
        Preconditions.checkArgument(saveMode == SaveMode.Append || saveMode == SaveMode.Overwrite, "Save mode %s is not supported", saveMode);
        Table tableAndResolveHadoopConfiguration = getTableAndResolveHadoopConfiguration(dataSourceOptions, new Configuration(lazyBaseConf()));
        Preconditions.checkArgument(SparkUtil.canHandleTimestampWithoutZone(dataSourceOptions.asMap(), lazySparkSession().conf()) || !SparkUtil.hasTimestampWithoutZone(tableAndResolveHadoopConfiguration.schema()), SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
        Schema convert = SparkSchemaUtil.convert(tableAndResolveHadoopConfiguration.schema(), structType);
        TypeUtil.validateWriteSchema(tableAndResolveHadoopConfiguration.schema(), convert, Boolean.valueOf(checkNullability(dataSourceOptions)), Boolean.valueOf(checkOrdering(dataSourceOptions)));
        SparkUtil.validatePartitionTransforms(tableAndResolveHadoopConfiguration.spec());
        String applicationId = lazySparkSession().sparkContext().applicationId();
        String str2 = lazySparkSession().conf().get("spark.wap.id", (String) null);
        return Optional.of(new Writer(lazySparkSession(), tableAndResolveHadoopConfiguration, dataSourceOptions, saveMode == SaveMode.Overwrite, applicationId, str2, convert, structType));
    }

    public StreamWriter createStreamWriter(String str, StructType structType, OutputMode outputMode, DataSourceOptions dataSourceOptions) {
        Preconditions.checkArgument(outputMode == OutputMode.Append() || outputMode == OutputMode.Complete(), "Output mode %s is not supported", outputMode);
        Table tableAndResolveHadoopConfiguration = getTableAndResolveHadoopConfiguration(dataSourceOptions, new Configuration(lazyBaseConf()));
        Schema convert = SparkSchemaUtil.convert(tableAndResolveHadoopConfiguration.schema(), structType);
        TypeUtil.validateWriteSchema(tableAndResolveHadoopConfiguration.schema(), convert, Boolean.valueOf(checkNullability(dataSourceOptions)), Boolean.valueOf(checkOrdering(dataSourceOptions)));
        SparkUtil.validatePartitionTransforms(tableAndResolveHadoopConfiguration.spec());
        return new StreamingWriter(lazySparkSession(), tableAndResolveHadoopConfiguration, dataSourceOptions, lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY()), outputMode, lazySparkSession().sparkContext().applicationId(), convert, structType);
    }

    protected Table findTable(DataSourceOptions dataSourceOptions, Configuration configuration) {
        Optional optional = dataSourceOptions.get("path");
        Preconditions.checkArgument(optional.isPresent(), "Cannot open table: path is not set");
        return ((String) optional.get()).contains("/") ? new HadoopTables(configuration).load((String) optional.get()) : CustomCatalogs.table(lazySparkSession(), (String) optional.get());
    }

    private SparkSession lazySparkSession() {
        if (this.lazySpark == null) {
            this.lazySpark = SparkSession.builder().getOrCreate();
        }
        return this.lazySpark;
    }

    private Configuration lazyBaseConf() {
        if (this.lazyConf == null) {
            this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
        }
        return this.lazyConf;
    }

    private Table getTableAndResolveHadoopConfiguration(DataSourceOptions dataSourceOptions, Configuration configuration) {
        mergeIcebergHadoopConfs(configuration, dataSourceOptions.asMap());
        Table findTable = findTable(dataSourceOptions, configuration);
        mergeIcebergHadoopConfs(configuration, findTable.properties());
        mergeIcebergHadoopConfs(configuration, dataSourceOptions.asMap());
        return findTable;
    }

    private static void mergeIcebergHadoopConfs(Configuration configuration, Map<String, String> map) {
        map.keySet().stream().filter(str -> {
            return str.startsWith("hadoop.");
        }).forEach(str2 -> {
            configuration.set(str2.replaceFirst("hadoop.", ""), (String) map.get(str2));
        });
    }

    private boolean checkNullability(DataSourceOptions dataSourceOptions) {
        return Boolean.parseBoolean(this.lazySpark.conf().get("spark.sql.iceberg.check-nullability", "true")) && dataSourceOptions.getBoolean("check-nullability", true);
    }

    private boolean checkOrdering(DataSourceOptions dataSourceOptions) {
        return Boolean.parseBoolean(this.lazySpark.conf().get("spark.sql.iceberg.check-ordering", "true")) && dataSourceOptions.getBoolean("check-ordering", true);
    }
}
