package org.apache.eventmesh.connector.mongodb.source.client;

import com.mongodb.ConnectionString;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.cloudevents.CloudEvent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.connector.mongodb.constant.MongodbConstants;
import org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient;
import org.apache.eventmesh.connector.mongodb.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.mongodb.utils.MongodbCloudEventUtil;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/mongodb/source/client/MongodbStandaloneSourceClient.class */
public class MongodbStandaloneSourceClient implements MongodbSourceClient {
    private static final Logger log = LoggerFactory.getLogger(MongodbStandaloneSourceClient.class);
    private final SourceConnectorConfig connectorConfig;
    private MongoClient client;
    private MongoCollection<Document> cappedCol;
    private final BlockingQueue<CloudEvent> queue;
    private volatile boolean started = false;
    private final SubTask task = new SubTask();
    private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, "EventMesh-MongodbStandaloneSourceClient-");

    /* loaded from: input_file:org/apache/eventmesh/connector/mongodb/source/client/MongodbStandaloneSourceClient$SubTask.class */
    private class SubTask implements Runnable {
        private final AtomicBoolean stop;

        private SubTask() {
            this.stop = new AtomicBoolean(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = -1;
            while (!this.stop.get()) {
                try {
                    MongoCursor it = MongodbStandaloneSourceClient.this.getCursor(MongodbStandaloneSourceClient.this.cappedCol, MongodbConstants.TOPIC, i).iterator();
                    while (it.hasNext()) {
                        Document document = (Document) it.next();
                        MongodbStandaloneSourceClient.this.queue.add(MongodbCloudEventUtil.convertToCloudEvent(document));
                        try {
                            i = (int) ((Double) document.get(MongodbConstants.CAPPED_COL_CURSOR_FN)).doubleValue();
                        } catch (ClassCastException e) {
                            i = ((Integer) document.get(MongodbConstants.CAPPED_COL_CURSOR_FN)).intValue();
                        }
                    }
                } catch (Exception e2) {
                    MongodbStandaloneSourceClient.log.error("[MongodbStandaloneSourceClient] thread run happen exception.", e2);
                }
                Thread.yield();
            }
        }

        public void stop() {
            this.stop.set(true);
        }
    }

    public MongodbStandaloneSourceClient(SourceConnectorConfig sourceConnectorConfig, BlockingQueue<CloudEvent> blockingQueue) {
        this.queue = blockingQueue;
        this.connectorConfig = sourceConnectorConfig;
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void init() {
        this.client = MongoClients.create(new ConnectionString(this.connectorConfig.getUrl()));
        this.cappedCol = this.client.getDatabase(this.connectorConfig.getDatabase()).getCollection(this.connectorConfig.getCollection());
        this.cappedCol.createIndex(new Document(MongodbConstants.CAPPED_COL_CURSOR_FN, 1).append("topic", 1));
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void start() {
        if (this.started) {
            return;
        }
        this.executor.execute(this.task);
        this.started = true;
    }

    @Override // org.apache.eventmesh.connector.mongodb.source.client.Impl.MongodbSourceClient
    public void stop() {
        if (this.started) {
            try {
                this.task.stop();
                this.client.close();
            } finally {
                this.started = false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FindIterable<Document> getCursor(MongoCollection<Document> mongoCollection, String str, int i) {
        return mongoCollection.find(new Document(MongodbConstants.CAPPED_COL_CURSOR_FN, new Document("$gt", Integer.valueOf(i))).append("topic", str));
    }
}
