/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.table.serialization;

import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.DeleteOneModel;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.InsertOneModel;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.UpdateOneModel;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.UpdateOptions;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.WriteModel;
import org.apache.flink.mongodb.shaded.org.bson.BsonDocument;
import org.apache.flink.mongodb.shaded.org.bson.BsonValue;
import org.apache.flink.mongodb.shaded.org.bson.conversions.Bson;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;

@Internal
public class MongoRowDataSerializationSchema
implements MongoSerializationSchema<RowData> {
    private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
    private final Function<RowData, BsonValue> createKey;

    public MongoRowDataSerializationSchema(RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter, Function<RowData, BsonValue> createKey) {
        this.rowDataToBsonConverter = rowDataToBsonConverter;
        this.createKey = createKey;
    }

    @Override
    public WriteModel<BsonDocument> serialize(RowData element, MongoSinkContext context) {
        switch (element.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                return this.processUpsert(element);
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                return this.processDelete(element);
            }
        }
        throw new TableException("Unsupported message kind: " + element.getRowKind());
    }

    private WriteModel<BsonDocument> processUpsert(RowData row) {
        BsonDocument document = this.rowDataToBsonConverter.convert(row);
        BsonValue key = this.createKey.apply(row);
        if (key != null) {
            BsonDocument filter = new BsonDocument("_id", key);
            document.remove("_id");
            BsonDocument update = new BsonDocument("$set", document);
            return new UpdateOneModel<BsonDocument>((Bson)filter, update, new UpdateOptions().upsert(true));
        }
        return new InsertOneModel<BsonDocument>(document);
    }

    private WriteModel<BsonDocument> processDelete(RowData row) {
        BsonValue key = this.createKey.apply(row);
        BsonDocument filter = new BsonDocument("_id", key);
        return new DeleteOneModel<BsonDocument>(filter);
    }
}

