package org.apache.eventmesh.connector.mongodb.source.client;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.cloudevents.CloudEvent;
import java.util.concurrent.BlockingQueue;
import org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient;
import org.apache.eventmesh.connector.mongodb.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.mongodb.utils.MongodbCloudEventUtil;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/mongodb/source/client/MongodbReplicaSetSourceClient.class */
public class MongodbReplicaSetSourceClient implements MongodbSourceClient {
    private static final Logger log = LoggerFactory.getLogger(MongodbReplicaSetSourceClient.class);
    private final SourceConnectorConfig connectorConfig;
    private volatile boolean started = false;
    private MongoClient client;
    private MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
    private final BlockingQueue<CloudEvent> queue;

    public MongodbReplicaSetSourceClient(SourceConnectorConfig sourceConnectorConfig, BlockingQueue<CloudEvent> blockingQueue) {
        this.queue = blockingQueue;
        this.connectorConfig = sourceConnectorConfig;
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void init() {
        this.client = MongoClients.create(new ConnectionString(this.connectorConfig.getUrl()));
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void start() {
        if (this.started) {
            return;
        }
        this.cursor = this.client.getDatabase(this.connectorConfig.getDatabase()).getCollection(this.connectorConfig.getCollection()).watch().cursor();
        handle();
        this.started = true;
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void stop() {
        if (this.started) {
            try {
                this.client.close();
                this.cursor.close();
            } finally {
                this.started = false;
            }
        }
    }

    private void handle() {
        while (this.cursor.hasNext()) {
            Document document = (Document) ((ChangeStreamDocument) this.cursor.next()).getFullDocument();
            if (document != null) {
                try {
                    this.queue.add(MongodbCloudEventUtil.convertToCloudEvent(document));
                } catch (Exception e) {
                    log.error("[MongodbReplicaSetSourceClient] happen exception.", e);
                }
            }
        }
    }
}
