package org.apache.paimon.spark.procedure;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.procedure.BaseProcedure;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.PaimonUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.logical.Filter;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:org/apache/paimon/spark/procedure/CompactProcedure.class */
public class CompactProcedure extends BaseProcedure {
    private static final ProcedureParameter[] PARAMETERS = {ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional(PartitionsTable.PARTITIONS, DataTypes.StringType), ProcedureParameter.optional("order_strategy", DataTypes.StringType), ProcedureParameter.optional("order_by", DataTypes.StringType), ProcedureParameter.optional("where", DataTypes.StringType)};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("result", DataTypes.BooleanType, true, Metadata.empty())});

    protected CompactProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    @Override // org.apache.paimon.spark.procedure.Procedure
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override // org.apache.paimon.spark.procedure.Procedure
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override // org.apache.paimon.spark.procedure.Procedure
    public InternalRow[] call(InternalRow internalRow) {
        Identifier identifier = toIdentifier(internalRow.getString(0), PARAMETERS[0].name());
        String string = blank(internalRow, 1) ? null : internalRow.getString(1);
        String name = blank(internalRow, 2) ? TableSorter.OrderType.NONE.name() : internalRow.getString(2);
        List emptyList = blank(internalRow, 3) ? Collections.emptyList() : Arrays.asList(internalRow.getString(3).split(FieldListaggAgg.DELIMITER));
        String string2 = blank(internalRow, 4) ? null : internalRow.getString(4);
        if (TableSorter.OrderType.NONE.name().equals(name) && !emptyList.isEmpty()) {
            throw new IllegalArgumentException("order_strategy \"none\" cannot work with order_by columns.");
        }
        Preconditions.checkArgument(string == null || string2 == null, "partitions and where cannot be used together.");
        String where = string != null ? toWhere(string) : string2;
        return (InternalRow[]) modifyPaimonTable(identifier, table -> {
            Preconditions.checkArgument(table instanceof FileStoreTable);
            LogicalPlan createRelation = createRelation(identifier);
            Expression expression = null;
            if (!StringUtils.isBlank(where)) {
                expression = ExpressionUtils.resolveFilter(spark(), createRelation, where);
                Preconditions.checkArgument(ExpressionUtils.isValidPredicate(spark(), expression, (String[]) table.partitionKeys().toArray(new String[0])), "Only partition predicate is supported, your predicate is %s, but partition keys are %s", expression, table.partitionKeys());
            }
            return new InternalRow[]{newInternalRow(Boolean.valueOf(execute((FileStoreTable) table, name, emptyList, createRelation, expression)))};
        });
    }

    @Override // org.apache.paimon.spark.procedure.Procedure
    public String description() {
        return "This procedure execute compact action on paimon table.";
    }

    private boolean blank(InternalRow internalRow, int i) {
        return internalRow.isNullAt(i) || StringUtils.isBlank(internalRow.getString(i));
    }

    private boolean execute(FileStoreTable fileStoreTable, String str, List<String> list, LogicalPlan logicalPlan, @Nullable Expression expression) {
        FileStoreTable copy = fileStoreTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
        BucketMode bucketMode = copy.bucketMode();
        TableSorter.OrderType of = TableSorter.OrderType.of(str);
        if (!of.equals(TableSorter.OrderType.NONE)) {
            switch (bucketMode) {
                case UNAWARE:
                    sortCompactUnAwareBucketTable(copy, of, list, logicalPlan, expression);
                    return true;
                default:
                    throw new UnsupportedOperationException("Spark compact with sort_type " + str + " only support unaware-bucket append-only table yet.");
            }
        }
        JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
        Predicate predicate = expression == null ? null : (Predicate) ExpressionUtils.convertConditionToPaimonPredicate(expression, logicalPlan.output(), copy.rowType(), false).getOrElse(null);
        switch (bucketMode) {
            case FIXED:
            case DYNAMIC:
                compactAwareBucketTable(copy, predicate, javaSparkContext);
                return true;
            case UNAWARE:
                compactUnAwareBucketTable(copy, predicate, javaSparkContext);
                return true;
            default:
                throw new UnsupportedOperationException("Spark compact with " + bucketMode + " is not support yet.");
        }
    }

    private void compactAwareBucketTable(FileStoreTable fileStoreTable, @Nullable Predicate predicate, JavaSparkContext javaSparkContext) {
        SnapshotReader newSnapshotReader = fileStoreTable.newSnapshotReader();
        if (predicate != null) {
            newSnapshotReader.withFilter(predicate);
        }
        List list = (List) newSnapshotReader.read().splits().stream().map(split -> {
            return (DataSplit) split;
        }).map(dataSplit -> {
            return Pair.of(dataSplit.partition(), Integer.valueOf(dataSplit.bucket()));
        }).distinct().map(pair -> {
            return Pair.of(SerializationUtils.serializeBinaryRow((BinaryRow) pair.getLeft()), pair.getRight());
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        BatchWriteBuilder newBatchWriteBuilder = fileStoreTable.newBatchWriteBuilder();
        JavaRDD mapPartitions = javaSparkContext.parallelize(list).mapPartitions(it -> {
            IOManager createIOManager = SparkUtils.createIOManager();
            BatchTableWrite newWrite = newBatchWriteBuilder.newWrite();
            newWrite.withIOManager(createIOManager);
            while (it.hasNext()) {
                try {
                    Pair pair2 = (Pair) it.next();
                    newWrite.compact(SerializationUtils.deserializeBinaryRow((byte[]) pair2.getLeft()), ((Integer) pair2.getRight()).intValue(), true);
                } catch (Throwable th) {
                    newWrite.close();
                    createIOManager.close();
                    throw th;
                }
            }
            CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
            List<CommitMessage> prepareCommit = newWrite.prepareCommit();
            ArrayList arrayList = new ArrayList(prepareCommit.size());
            Iterator<CommitMessage> it = prepareCommit.iterator();
            while (it.hasNext()) {
                arrayList.add(commitMessageSerializer.serialize(it.next()));
            }
            Iterator it2 = arrayList.iterator();
            newWrite.close();
            createIOManager.close();
            return it2;
        });
        try {
            BatchTableCommit newCommit = newBatchWriteBuilder.newCommit();
            Throwable th = null;
            try {
                try {
                    CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
                    List collect = mapPartitions.collect();
                    ArrayList arrayList = new ArrayList(collect.size());
                    Iterator it2 = collect.iterator();
                    while (it2.hasNext()) {
                        arrayList.add(commitMessageSerializer.deserialize(commitMessageSerializer.getVersion(), (byte[]) it2.next()));
                    }
                    newCommit.commit(arrayList);
                    if (newCommit != null) {
                        if (0 != 0) {
                            try {
                                newCommit.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newCommit.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void compactUnAwareBucketTable(FileStoreTable fileStoreTable, @Nullable Predicate predicate, JavaSparkContext javaSparkContext) {
        List<AppendOnlyCompactionTask> run = new AppendOnlyTableCompactionCoordinator(fileStoreTable, false, predicate).run();
        if (run.isEmpty()) {
            return;
        }
        CompactionTaskSerializer compactionTaskSerializer = new CompactionTaskSerializer();
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<AppendOnlyCompactionTask> it = run.iterator();
            while (it.hasNext()) {
                arrayList.add(compactionTaskSerializer.serialize(it.next()));
            }
            String uuid = UUID.randomUUID().toString();
            JavaRDD mapPartitions = javaSparkContext.parallelize(arrayList).mapPartitions(it2 -> {
                AppendOnlyFileStoreWrite appendOnlyFileStoreWrite = (AppendOnlyFileStoreWrite) fileStoreTable.store().newWrite(uuid);
                CompactionTaskSerializer compactionTaskSerializer2 = new CompactionTaskSerializer();
                ArrayList arrayList2 = new ArrayList();
                try {
                    CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
                    while (it2.hasNext()) {
                        arrayList2.add(commitMessageSerializer.serialize(compactionTaskSerializer2.deserialize(compactionTaskSerializer2.getVersion(), (byte[]) it2.next()).doCompact(appendOnlyFileStoreWrite)));
                    }
                    Iterator it2 = arrayList2.iterator();
                    appendOnlyFileStoreWrite.close();
                    return it2;
                } catch (Throwable th) {
                    appendOnlyFileStoreWrite.close();
                    throw th;
                }
            });
            try {
                TableCommitImpl newCommit = fileStoreTable.newCommit(uuid);
                Throwable th = null;
                try {
                    CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
                    List collect = mapPartitions.collect();
                    ArrayList arrayList2 = new ArrayList(collect.size());
                    Iterator it3 = collect.iterator();
                    while (it3.hasNext()) {
                        arrayList2.add(commitMessageSerializer.deserialize(compactionTaskSerializer.getVersion(), (byte[]) it3.next()));
                    }
                    newCommit.commit(arrayList2);
                    if (newCommit != null) {
                        if (0 != 0) {
                            try {
                                newCommit.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newCommit.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException("serialize compaction task failed");
        }
    }

    private void sortCompactUnAwareBucketTable(FileStoreTable fileStoreTable, TableSorter.OrderType orderType, List<String> list, LogicalPlan logicalPlan, @Nullable Expression expression) {
        new WriteIntoPaimonTable(fileStoreTable, DynamicOverWrite$.MODULE$, TableSorter.getSorter(fileStoreTable, orderType, list).sort(PaimonUtils.createDataset(spark(), expression == null ? logicalPlan : new Filter(expression, logicalPlan))), new Options()).run(spark());
    }

    @VisibleForTesting
    static String toWhere(String str) {
        return (String) ParameterUtils.getPartitions(str.split(CsvSchema.DEFAULT_ARRAY_ELEMENT_SEPARATOR)).stream().map(map -> {
            return map.entrySet().stream().map(entry -> {
                return ((String) entry.getKey()) + "=" + ((String) entry.getValue());
            }).reduce((str2, str3) -> {
                return str2 + " AND " + str3;
            });
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(str2 -> {
            return "(" + str2 + ")";
        }).reduce((str3, str4) -> {
            return str3 + " OR " + str4;
        }).orElse(null);
    }

    public static ProcedureBuilder builder() {
        return new BaseProcedure.Builder<CompactProcedure>() { // from class: org.apache.paimon.spark.procedure.CompactProcedure.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.paimon.spark.procedure.BaseProcedure.Builder
            public CompactProcedure doBuild() {
                return new CompactProcedure(tableCatalog());
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1211875933:
                if (implMethodName.equals("lambda$compactUnAwareBucketTable$c06b4ee2$1")) {
                    z = true;
                    break;
                }
                break;
            case 51411410:
                if (implMethodName.equals("lambda$compactAwareBucketTable$a609c1ef$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/paimon/spark/procedure/CompactProcedure") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/sink/BatchWriteBuilder;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    BatchWriteBuilder batchWriteBuilder = (BatchWriteBuilder) serializedLambda.getCapturedArg(0);
                    return it -> {
                        IOManager createIOManager = SparkUtils.createIOManager();
                        BatchTableWrite newWrite = batchWriteBuilder.newWrite();
                        newWrite.withIOManager(createIOManager);
                        while (it.hasNext()) {
                            try {
                                Pair pair2 = (Pair) it.next();
                                newWrite.compact(SerializationUtils.deserializeBinaryRow((byte[]) pair2.getLeft()), ((Integer) pair2.getRight()).intValue(), true);
                            } catch (Throwable th) {
                                newWrite.close();
                                createIOManager.close();
                                throw th;
                            }
                        }
                        CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
                        List<CommitMessage> prepareCommit = newWrite.prepareCommit();
                        ArrayList arrayList = new ArrayList(prepareCommit.size());
                        Iterator<CommitMessage> it = prepareCommit.iterator();
                        while (it.hasNext()) {
                            arrayList.add(commitMessageSerializer.serialize(it.next()));
                        }
                        Iterator it2 = arrayList.iterator();
                        newWrite.close();
                        createIOManager.close();
                        return it2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/paimon/spark/procedure/CompactProcedure") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    FileStoreTable fileStoreTable = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return it2 -> {
                        AppendOnlyFileStoreWrite appendOnlyFileStoreWrite = (AppendOnlyFileStoreWrite) fileStoreTable.store().newWrite(str);
                        CompactionTaskSerializer compactionTaskSerializer2 = new CompactionTaskSerializer();
                        ArrayList arrayList2 = new ArrayList();
                        try {
                            CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
                            while (it2.hasNext()) {
                                arrayList2.add(commitMessageSerializer.serialize(compactionTaskSerializer2.deserialize(compactionTaskSerializer2.getVersion(), (byte[]) it2.next()).doCompact(appendOnlyFileStoreWrite)));
                            }
                            Iterator it2 = arrayList2.iterator();
                            appendOnlyFileStoreWrite.close();
                            return it2;
                        } catch (Throwable th) {
                            appendOnlyFileStoreWrite.close();
                            throw th;
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
