package org.apache.eventmesh.connector.mongodb.sink.client;

import com.mongodb.BasicDBObject;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import io.cloudevents.CloudEvent;
import org.apache.eventmesh.connector.mongodb.constant.MongodbConstants;
import org.apache.eventmesh.connector.mongodb.sink.client.Impl.MongodbSinkClient;
import org.apache.eventmesh.connector.mongodb.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.mongodb.utils.MongodbCloudEventUtil;
import org.bson.Document;

/* loaded from: input_file:org/apache/eventmesh/connector/mongodb/sink/client/MongodbStandaloneSinkClient.class */
public class MongodbStandaloneSinkClient implements MongodbSinkClient {
    private final SinkConnectorConfig connectorConfig;
    private volatile boolean started = false;
    private MongoClient client;
    private MongoCollection<Document> cappedCol;
    private MongoCollection<Document> seqCol;

    public MongodbStandaloneSinkClient(SinkConnectorConfig sinkConnectorConfig) {
        this.connectorConfig = sinkConnectorConfig;
    }

    @Override // org.apache.eventmesh.connector.mongodb.sink.client.Impl.MongodbSinkClient
    public void init() {
        this.client = MongoClients.create(new ConnectionString(this.connectorConfig.getUrl()));
        MongoDatabase database = this.client.getDatabase(this.connectorConfig.getDatabase());
        this.cappedCol = database.getCollection(this.connectorConfig.getCollection());
        this.seqCol = database.getCollection(MongodbConstants.SEQUENCE_COLLECTION_NAME);
    }

    @Override // org.apache.eventmesh.connector.mongodb.sink.client.Impl.MongodbSinkClient
    public void start() {
        if (this.started) {
            return;
        }
        this.started = true;
    }

    @Override // org.apache.eventmesh.connector.mongodb.sink.client.Impl.MongodbSinkClient
    public void publish(CloudEvent cloudEvent) {
        Document convertToDocument = MongodbCloudEventUtil.convertToDocument(cloudEvent);
        convertToDocument.append("topic", MongodbConstants.TOPIC).append(MongodbConstants.CAPPED_COL_CURSOR_FN, Integer.valueOf(getNextSeq(MongodbConstants.TOPIC)));
        this.cappedCol.insertOne(convertToDocument);
    }

    @Override // org.apache.eventmesh.connector.mongodb.sink.client.Impl.MongodbSinkClient
    public void stop() {
        if (this.started) {
            try {
                this.client.close();
            } finally {
                this.started = false;
            }
        }
    }

    public int getNextSeq(String str) {
        Document document = new Document("topic", str);
        Document document2 = new Document("$inc", new BasicDBObject(MongodbConstants.SEQUENCE_VALUE_FN, 1));
        FindOneAndUpdateOptions findOneAndUpdateOptions = new FindOneAndUpdateOptions();
        findOneAndUpdateOptions.upsert(true);
        findOneAndUpdateOptions.returnDocument(ReturnDocument.AFTER);
        return ((Integer) ((Document) this.seqCol.findOneAndUpdate(document, document2, findOneAndUpdateOptions)).get(MongodbConstants.SEQUENCE_VALUE_FN)).intValue();
    }
}
