package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb;

import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;

/* loaded from: input_file:com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.class */
public class MongoDbUpdate implements CdcOperation {
    private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
    private static final String JSON_DOC_FIELD_PATH = "patch";
    private static final String JSON_DOC_FIELD_AFTER = "after";
    public static final String INTERNAL_OPLOG_FIELD_V = "$v";
    private final EventFormat eventFormat;

    /* loaded from: input_file:com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate$EventFormat.class */
    public enum EventFormat {
        Oplog,
        ChangeStream
    }

    public MongoDbUpdate(EventFormat eventFormat) {
        this.eventFormat = eventFormat;
    }

    @Override // com.mongodb.kafka.connect.sink.cdc.CdcOperation
    public WriteModel<BsonDocument> perform(SinkDocument sinkDocument) {
        try {
            if (EventFormat.ChangeStream.equals(this.eventFormat)) {
                return handleChangeStreamEvent(sinkDocument);
            }
            if (EventFormat.Oplog.equals(this.eventFormat)) {
                return handleOplogEvent(sinkDocument);
            }
            throw new UnsupportedOperationException(String.format("Unsupported event format '%s'.", this.eventFormat));
        } catch (DataException e) {
            throw e;
        } catch (Exception e2) {
            throw new DataException(e2.getMessage(), e2);
        }
    }

    private WriteModel<BsonDocument> handleOplogEvent(SinkDocument sinkDocument) {
        BsonDocument documentValue = getDocumentValue(sinkDocument);
        if (!documentValue.containsKey("patch")) {
            throw new DataException(String.format("Update document missing `%s` field.", "patch"));
        }
        BsonDocument parse = BsonDocument.parse(documentValue.getString("patch").getValue());
        parse.remove((Object) INTERNAL_OPLOG_FIELD_V);
        return parse.containsKey("_id") ? new ReplaceOneModel(new BsonDocument("_id", parse.get((Object) "_id")), parse, REPLACE_OPTIONS) : new UpdateOneModel(getFilterDocByKeyId(sinkDocument), parse);
    }

    private WriteModel<BsonDocument> handleChangeStreamEvent(SinkDocument sinkDocument) {
        BsonDocument documentValue = getDocumentValue(sinkDocument);
        if (documentValue.containsKey("after")) {
            return new ReplaceOneModel(getFilterDocByKeyId(sinkDocument), BsonDocument.parse(documentValue.getString("after").getValue()), REPLACE_OPTIONS);
        }
        throw new DataException(String.format("Update document missing `%s` field.", "after"));
    }

    private BsonDocument getFilterDocByKeyId(SinkDocument sinkDocument) {
        BsonDocument documentKey = getDocumentKey(sinkDocument);
        if (documentKey.containsKey("id")) {
            return BsonDocument.parse(String.format("{%s: %s}", "_id", documentKey.getString("id").getValue()));
        }
        throw new DataException(String.format("Update document missing `%s` field.", "id"));
    }

    private BsonDocument getDocumentKey(SinkDocument sinkDocument) {
        return sinkDocument.getKeyDoc().orElseThrow(() -> {
            return new DataException("Key document must not be missing for update operation");
        });
    }

    private BsonDocument getDocumentValue(SinkDocument sinkDocument) {
        return sinkDocument.getValueDoc().orElseThrow(() -> {
            return new DataException("Value document must not be missing for update operation");
        });
    }
}
