package org.apache.paimon.spark;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.write.V1Write;
import org.apache.spark.sql.sources.InsertableRelation;

/* loaded from: input_file:org/apache/paimon/spark/SparkWrite.class */
public class SparkWrite implements V1Write {
    private final Table table;
    private final Lock.Factory lockFactory;
    private final CommitMessageSerializer serializer = new CommitMessageSerializer();

    /* loaded from: input_file:org/apache/paimon/spark/SparkWrite$ComputeBucket.class */
    private static class ComputeBucket implements Function<Row, Integer> {
        private final BatchWriteBuilder writeBuilder;
        private transient BatchTableWrite lazyWriter;

        private ComputeBucket(BatchWriteBuilder batchWriteBuilder) {
            this.writeBuilder = batchWriteBuilder;
        }

        private BatchTableWrite computer() {
            if (this.lazyWriter == null) {
                this.lazyWriter = this.writeBuilder.newWrite();
            }
            return this.lazyWriter;
        }

        public Integer call(Row row) {
            return Integer.valueOf(computer().getBucket(new SparkRow(this.writeBuilder.rowType(), row)));
        }
    }

    /* loaded from: input_file:org/apache/paimon/spark/SparkWrite$ListConcat.class */
    private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
        private ListConcat() {
        }

        public List<T> call(List<T> list, List<T> list2) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(list);
            arrayList.addAll(list2);
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/paimon/spark/SparkWrite$WriteRecords.class */
    private static class WriteRecords implements Function<Iterable<Row>, List<CommitMessage>> {
        private final BatchWriteBuilder writeBuilder;

        private WriteRecords(BatchWriteBuilder batchWriteBuilder) {
            this.writeBuilder = batchWriteBuilder;
        }

        public List<CommitMessage> call(Iterable<Row> iterable) throws Exception {
            BatchTableWrite newWrite = this.writeBuilder.newWrite();
            Throwable th = null;
            try {
                try {
                    Iterator<Row> it = iterable.iterator();
                    while (it.hasNext()) {
                        newWrite.write(new SparkRow(this.writeBuilder.rowType(), it.next()));
                    }
                    List<CommitMessage> prepareCommit = newWrite.prepareCommit();
                    if (newWrite != null) {
                        if (0 != 0) {
                            try {
                                newWrite.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newWrite.close();
                        }
                    }
                    return prepareCommit;
                } finally {
                }
            } catch (Throwable th3) {
                if (newWrite != null) {
                    if (th != null) {
                        try {
                            newWrite.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newWrite.close();
                    }
                }
                throw th3;
            }
        }
    }

    public SparkWrite(Table table, Lock.Factory factory) {
        this.table = table;
        this.lockFactory = factory;
    }

    public InsertableRelation toInsertableRelation() {
        return (dataset, z) -> {
            if (z) {
                throw new UnsupportedOperationException("Overwrite is unsupported.");
            }
            BatchWriteBuilder newBatchWriteBuilder = this.table.newBatchWriteBuilder();
            List<CommitMessage> list = (List) dataset.toJavaRDD().groupBy(new ComputeBucket(newBatchWriteBuilder)).mapValues(new WriteRecords(newBatchWriteBuilder)).values().mapPartitions(SparkWrite::serializeCommitMessages).collect().stream().map(this::deserializeCommitMessage).collect(Collectors.toList());
            try {
                InnerTableCommit withLock = ((InnerTableCommit) newBatchWriteBuilder.newCommit()).withLock(this.lockFactory.create());
                Throwable th = null;
                try {
                    try {
                        withLock.commit(list);
                        if (withLock != null) {
                            if (0 != 0) {
                                try {
                                    withLock.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                withLock.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static Iterator<byte[]> serializeCommitMessages(Iterator<List<CommitMessage>> it) {
        ArrayList arrayList = new ArrayList();
        CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer();
        while (it.hasNext()) {
            Iterator<CommitMessage> it2 = it.next().iterator();
            while (it2.hasNext()) {
                try {
                    arrayList.add(commitMessageSerializer.serialize(it2.next()));
                } catch (IOException e) {
                    throw new RuntimeException("Failed to serialize CommitMessage's object", e);
                }
            }
        }
        return arrayList.iterator();
    }

    private CommitMessage deserializeCommitMessage(byte[] bArr) {
        try {
            return this.serializer.deserialize(this.serializer.getVersion(), bArr);
        } catch (IOException e) {
            throw new RuntimeException("Failed to deserialize CommitMessage's object", e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1706300067:
                if (implMethodName.equals("serializeCommitMessages")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/paimon/spark/SparkWrite") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    return SparkWrite::serializeCommitMessages;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
