package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer.class */
public class RowDataDocumentSerializer implements DocumentSerializer<SeaTunnelRow> {
    private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
    private final boolean isUpsertEnable;
    private final Function<BsonDocument, BsonDocument> filterConditions;
    private final Map<RowKind, WriteModelSupplier> writeModelSuppliers = createWriteModelSuppliers();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer$WriteModelSupplier.class */
    public interface WriteModelSupplier {
        WriteModel<BsonDocument> get(SeaTunnelRow seaTunnelRow);
    }

    public RowDataDocumentSerializer(RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter, MongodbWriterOptions mongodbWriterOptions, Function<BsonDocument, BsonDocument> function) {
        this.rowDataToBsonConverter = rowDataToBsonConverter;
        this.isUpsertEnable = mongodbWriterOptions.isUpsertEnable();
        this.filterConditions = function;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer
    public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow seaTunnelRow) {
        WriteModelSupplier writeModelSupplier = this.writeModelSuppliers.get(seaTunnelRow.getRowKind());
        if (writeModelSupplier == null) {
            throw new MongodbConnectorException(CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unsupported message kind: " + seaTunnelRow.getRowKind());
        }
        return writeModelSupplier.get(seaTunnelRow);
    }

    private Map<RowKind, WriteModelSupplier> createWriteModelSuppliers() {
        HashMap hashMap = new HashMap();
        WriteModelSupplier writeModelSupplier = seaTunnelRow -> {
            BsonDocument convert = this.rowDataToBsonConverter.convert(seaTunnelRow);
            Bson generateFilter = generateFilter(this.filterConditions.apply(convert));
            convert.remove((Object) "_id");
            return new UpdateOneModel(generateFilter, new BsonDocument("$set", convert), new UpdateOptions().upsert(true));
        };
        WriteModelSupplier writeModelSupplier2 = seaTunnelRow2 -> {
            BsonDocument convert = this.rowDataToBsonConverter.convert(seaTunnelRow2);
            Bson generateFilter = generateFilter(this.filterConditions.apply(convert));
            convert.remove((Object) "_id");
            return new UpdateOneModel(generateFilter, new BsonDocument("$set", convert));
        };
        WriteModelSupplier writeModelSupplier3 = seaTunnelRow3 -> {
            return new InsertOneModel(this.rowDataToBsonConverter.convert(seaTunnelRow3));
        };
        WriteModelSupplier writeModelSupplier4 = seaTunnelRow4 -> {
            return new DeleteOneModel(generateFilter(this.filterConditions.apply(this.rowDataToBsonConverter.convert(seaTunnelRow4))));
        };
        hashMap.put(RowKind.INSERT, this.isUpsertEnable ? writeModelSupplier : writeModelSupplier3);
        hashMap.put(RowKind.UPDATE_AFTER, this.isUpsertEnable ? writeModelSupplier : writeModelSupplier2);
        hashMap.put(RowKind.DELETE, writeModelSupplier4);
        return hashMap;
    }

    public static Bson generateFilter(BsonDocument bsonDocument) {
        return Filters.and((List) bsonDocument.entrySet().stream().map(entry -> {
            return Filters.eq((String) entry.getKey(), entry.getValue());
        }).collect(Collectors.toList()));
    }
}
