package org.apache.iceberg.spark.procedures;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.procedures.BaseProcedure;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.util.LocationUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.runtime.BoxedUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iceberg/spark/procedures/AddFilesProcedure.class */
public class AddFilesProcedure extends BaseProcedure {
    private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
    private static final ProcedureParameter[] PARAMETERS = {ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.required("source_table", DataTypes.StringType), ProcedureParameter.optional("partition_filter", STRING_MAP), ProcedureParameter.optional("check_duplicate_files", DataTypes.BooleanType)};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty())});

    private AddFilesProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    public static SparkProcedures.ProcedureBuilder builder() {
        return new BaseProcedure.Builder<AddFilesProcedure>() { // from class: org.apache.iceberg.spark.procedures.AddFilesProcedure.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.iceberg.spark.procedures.BaseProcedure.Builder
            public AddFilesProcedure doBuild() {
                return new AddFilesProcedure(tableCatalog());
            }
        };
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public InternalRow[] call(InternalRow internalRow) {
        Identifier identifier = toIdentifier(internalRow.getString(0), PARAMETERS[0].name());
        Identifier identifier2 = toCatalogAndIdentifier(internalRow.getString(1), PARAMETERS[1].name(), spark().sessionState().catalogManager().v2SessionCatalog()).identifier();
        HashMap newHashMap = Maps.newHashMap();
        if (!internalRow.isNullAt(2)) {
            internalRow.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, (obj, obj2) -> {
                newHashMap.put(obj.toString(), obj2.toString());
                return BoxedUnit.UNIT;
            });
        }
        return new InternalRow[]{newInternalRow(Long.valueOf(importToIceberg(identifier, identifier2, newHashMap, internalRow.isNullAt(3) ? true : internalRow.getBoolean(3))))};
    }

    private boolean isFileIdentifier(Identifier identifier) {
        String[] namespace = identifier.namespace();
        return namespace.length == 1 && (namespace[0].equalsIgnoreCase("orc") || namespace[0].equalsIgnoreCase("parquet") || namespace[0].equalsIgnoreCase("avro"));
    }

    private long importToIceberg(Identifier identifier, Identifier identifier2, Map<String, String> map, boolean z) {
        return ((Long) modifyIcebergTable(identifier, table -> {
            validatePartitionSpec(table, map);
            ensureNameMappingPresent(table);
            if (isFileIdentifier(identifier2)) {
                importFileTable(table, new Path(identifier2.name()), identifier2.namespace()[0], map, z);
            } else {
                importCatalogTable(table, identifier2, map, z);
            }
            return Long.valueOf(Long.parseLong((String) table.currentSnapshot().summary().getOrDefault("added-data-files", "0")));
        })).longValue();
    }

    private static void ensureNameMappingPresent(Table table) {
        if (table.properties().get("schema.name-mapping.default") == null) {
            table.updateProperties().set("schema.name-mapping.default", NameMappingParser.toJson(MappingUtil.create(table.schema()))).commit();
        }
    }

    private void importFileTable(Table table, Path path, String str, Map<String, String> map, boolean z) {
        List<SparkTableUtil.SparkPartition> partitions = Spark3Util.getPartitions(spark(), path, str, map);
        if (!table.spec().isUnpartitioned()) {
            Preconditions.checkArgument(!partitions.isEmpty(), "Cannot find any matching partitions in table %s", partitions);
            importPartitions(table, partitions, z);
        } else {
            Preconditions.checkArgument(partitions.isEmpty(), "Cannot add partitioned files to an unpartitioned table");
            Preconditions.checkArgument(map.isEmpty(), "Cannot use a partition filter when importingto an unpartitioned table");
            importPartitions(table, ImmutableList.of(new SparkTableUtil.SparkPartition(Collections.emptyMap(), path.toString(), str)), z);
        }
    }

    private void importCatalogTable(Table table, Identifier identifier, Map<String, String> map, boolean z) {
        String metadataLocation = getMetadataLocation(table);
        SparkTableUtil.importSparkTable(spark(), Spark3Util.toV1TableIdentifier(identifier), table, metadataLocation, map, z);
    }

    private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> list, boolean z) {
        SparkTableUtil.importSparkPartitions(spark(), list, table, table.spec(), getMetadataLocation(table), z);
    }

    private String getMetadataLocation(Table table) {
        return LocationUtil.stripTrailingSlash((String) table.properties().getOrDefault("write.metadata.path", LocationUtil.stripTrailingSlash(table.location()) + "/metadata"));
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public String description() {
        return "AddFiles";
    }

    private void validatePartitionSpec(Table table, Map<String, String> map) {
        List fields = table.spec().fields();
        Set set = (Set) table.spec().fields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet());
        boolean z = !fields.isEmpty();
        boolean z2 = !map.isEmpty();
        List list = (List) fields.stream().filter(partitionField -> {
            return !partitionField.transform().isIdentity();
        }).collect(Collectors.toList());
        Preconditions.checkArgument(list.isEmpty(), "Cannot add data files to target table %s because that table is partitioned and contains non-identitypartition transforms which will not be compatible. Found non-identity fields %s", table.name(), list);
        if (!z || !z2) {
            Preconditions.checkArgument(!z2, "Cannot use partition filter with an unpartitioned table %s", table.name());
            return;
        }
        Preconditions.checkArgument(fields.size() >= map.size(), "Cannot add data files to target table %s because that table is partitioned, but the number of columns in the provided partition filter (%s) is greater than the number of partitioned columns in table (%s)", table.name(), Integer.valueOf(map.size()), Integer.valueOf(fields.size()));
        List list2 = (List) map.keySet().stream().filter(str -> {
            return !set.contains(str);
        }).collect(Collectors.toList());
        Preconditions.checkArgument(list2.isEmpty(), "Cannot add files to target table %s. %s is partitioned but the specified partition filter refers to columns that are not partitioned: '%s' . Valid partition columns %s", table.name(), table.name(), list2, String.join(",", set));
    }
}
