package org.apache.iceberg.spark.procedures;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.ChangelogIterator;
import org.apache.iceberg.spark.procedures.BaseProcedure;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.source.SparkChangelogTable;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

/* loaded from: input_file:org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.class */
public class CreateChangelogViewProcedure extends BaseProcedure {
    private static final ProcedureParameter TABLE_PARAM = ProcedureParameter.required("table", DataTypes.StringType);
    private static final ProcedureParameter CHANGELOG_VIEW_PARAM = ProcedureParameter.optional("changelog_view", DataTypes.StringType);
    private static final ProcedureParameter OPTIONS_PARAM = ProcedureParameter.optional("options", STRING_MAP);
    private static final ProcedureParameter COMPUTE_UPDATES_PARAM = ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
    private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
    private static final ProcedureParameter NET_CHANGES = ProcedureParameter.optional("net_changes", DataTypes.BooleanType);
    private static final ProcedureParameter[] PARAMETERS = {TABLE_PARAM, CHANGELOG_VIEW_PARAM, OPTIONS_PARAM, COMPUTE_UPDATES_PARAM, IDENTIFIER_COLUMNS_PARAM, NET_CHANGES};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())});

    public static SparkProcedures.ProcedureBuilder builder() {
        return new BaseProcedure.Builder<CreateChangelogViewProcedure>() { // from class: org.apache.iceberg.spark.procedures.CreateChangelogViewProcedure.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.iceberg.spark.procedures.BaseProcedure.Builder
            public CreateChangelogViewProcedure doBuild() {
                return new CreateChangelogViewProcedure(tableCatalog());
            }
        };
    }

    private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public InternalRow[] call(InternalRow internalRow) {
        Dataset<Row> removeCarryoverRows;
        ProcedureInput procedureInput = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, internalRow);
        Identifier ident = procedureInput.ident(TABLE_PARAM);
        Dataset<Row> loadRows = loadRows(changelogTableIdent(ident), options(procedureInput));
        boolean booleanValue = procedureInput.asBoolean(NET_CHANGES, false).booleanValue();
        if (shouldComputeUpdateImages(procedureInput)) {
            Preconditions.checkArgument(!booleanValue, "Not support net changes with update images");
            removeCarryoverRows = computeUpdateImages(identifierColumns(procedureInput, ident), loadRows);
        } else {
            removeCarryoverRows = removeCarryoverRows(loadRows, booleanValue);
        }
        String viewName = viewName(procedureInput, ident.name());
        removeCarryoverRows.createOrReplaceTempView(viewName);
        return toOutputRows(viewName);
    }

    private Dataset<Row> computeUpdateImages(String[] strArr, Dataset<Row> dataset) {
        Preconditions.checkArgument(strArr.length > 0, "Cannot compute the update images because identifier columns are not set");
        Column[] columnArr = new Column[strArr.length + 1];
        for (int i = 0; i < strArr.length; i++) {
            columnArr[i] = dataset.col(strArr[i]);
        }
        columnArr[columnArr.length - 1] = dataset.col(MetadataColumns.CHANGE_ORDINAL.name());
        return applyChangelogIterator(dataset, columnArr);
    }

    private boolean shouldComputeUpdateImages(ProcedureInput procedureInput) {
        return procedureInput.asBoolean(COMPUTE_UPDATES_PARAM, Boolean.valueOf(procedureInput.isProvided(IDENTIFIER_COLUMNS_PARAM))).booleanValue();
    }

    private Dataset<Row> removeCarryoverRows(Dataset<Row> dataset, boolean z) {
        Predicate predicate;
        if (z) {
            HashSet newHashSet = Sets.newHashSet(new String[]{MetadataColumns.CHANGE_TYPE.name(), MetadataColumns.CHANGE_ORDINAL.name(), MetadataColumns.COMMIT_SNAPSHOT_ID.name()});
            predicate = str -> {
                return !newHashSet.contains(str);
            };
        } else {
            predicate = str2 -> {
                return !str2.equals(MetadataColumns.CHANGE_TYPE.name());
            };
        }
        Stream filter = Arrays.stream(dataset.columns()).filter(predicate);
        dataset.getClass();
        return applyCarryoverRemoveIterator(dataset, (Column[]) filter.map(dataset::col).toArray(i -> {
            return new Column[i];
        }), z);
    }

    private String[] identifierColumns(ProcedureInput procedureInput, Identifier identifier) {
        return procedureInput.isProvided(IDENTIFIER_COLUMNS_PARAM) ? procedureInput.asStringArray(IDENTIFIER_COLUMNS_PARAM) : (String[]) loadSparkTable(identifier).table().schema().identifierFieldNames().toArray(new String[0]);
    }

    private Identifier changelogTableIdent(Identifier identifier) {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(Arrays.asList(identifier.namespace()));
        newArrayList.add(identifier.name());
        return Identifier.of((String[]) newArrayList.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
    }

    private Map<String, String> options(ProcedureInput procedureInput) {
        return procedureInput.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
    }

    private String viewName(ProcedureInput procedureInput, String str) {
        return procedureInput.asString(CHANGELOG_VIEW_PARAM, String.format("`%s_changes`", str));
    }

    private Dataset<Row> applyChangelogIterator(Dataset<Row> dataset, Column[] columnArr) {
        Column[] sortSpec = sortSpec(dataset, columnArr, false);
        StructType schema = dataset.schema();
        String[] strArr = (String[]) Arrays.stream(columnArr).map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
        return dataset.repartition(columnArr).sortWithinPartitions(sortSpec).mapPartitions(it -> {
            return ChangelogIterator.computeUpdates(it, schema, strArr);
        }, RowEncoder.apply(schema));
    }

    private Dataset<Row> applyCarryoverRemoveIterator(Dataset<Row> dataset, Column[] columnArr, boolean z) {
        Column[] sortSpec = sortSpec(dataset, columnArr, z);
        StructType schema = dataset.schema();
        return dataset.repartition(columnArr).sortWithinPartitions(sortSpec).mapPartitions(it -> {
            return z ? ChangelogIterator.removeNetCarryovers(it, schema) : ChangelogIterator.removeCarryovers(it, schema);
        }, RowEncoder.apply(schema));
    }

    private static Column[] sortSpec(Dataset<Row> dataset, Column[] columnArr, boolean z) {
        Column col = dataset.col(MetadataColumns.CHANGE_TYPE.name());
        Column[] columnArr2 = z ? new Column[]{dataset.col(MetadataColumns.CHANGE_ORDINAL.name()), col} : new Column[]{col};
        Column[] columnArr3 = new Column[columnArr.length + columnArr2.length];
        System.arraycopy(columnArr, 0, columnArr3, 0, columnArr.length);
        System.arraycopy(columnArr2, 0, columnArr3, columnArr.length, columnArr2.length);
        return columnArr3;
    }

    private InternalRow[] toOutputRows(String str) {
        return new InternalRow[]{newInternalRow(UTF8String.fromString(str))};
    }

    @Override // org.apache.spark.sql.connector.iceberg.catalog.Procedure
    public String description() {
        return "CreateChangelogViewProcedure";
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1923649999:
                if (implMethodName.equals("lambda$applyChangelogIterator$9fa14ecd$1")) {
                    z = true;
                    break;
                }
                break;
            case 2008184820:
                if (implMethodName.equals("lambda$applyCarryoverRemoveIterator$e6d30ace$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapPartitionsFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure") && serializedLambda.getImplMethodSignature().equals("(ZLorg/apache/spark/sql/types/StructType;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    StructType structType = (StructType) serializedLambda.getCapturedArg(1);
                    return it -> {
                        return booleanValue ? ChangelogIterator.removeNetCarryovers(it, structType) : ChangelogIterator.removeCarryovers(it, structType);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapPartitionsFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/types/StructType;[Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    StructType structType2 = (StructType) serializedLambda.getCapturedArg(0);
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(1);
                    return it2 -> {
                        return ChangelogIterator.computeUpdates(it2, structType2, strArr);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
