package org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader;

import com.mongodb.client.MongoCursor;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.class */
public class MongodbReader implements SourceReader<SeaTunnelRow, MongoSplit> {
    private static final Logger log = LoggerFactory.getLogger(MongodbReader.class);
    private final Queue<MongoSplit> pendingSplits = new ConcurrentLinkedDeque();
    private final DocumentDeserializer<SeaTunnelRow> deserializer;
    private final SourceReader.Context context;
    private final MongodbClientProvider clientProvider;
    private MongoCursor<BsonDocument> cursor;
    private final MongodbReadOptions readOptions;
    private volatile boolean noMoreSplit;

    public MongodbReader(SourceReader.Context context, MongodbClientProvider mongodbClientProvider, DocumentDeserializer<SeaTunnelRow> documentDeserializer, MongodbReadOptions mongodbReadOptions) {
        this.deserializer = documentDeserializer;
        this.context = context;
        this.clientProvider = mongodbClientProvider;
        this.readOptions = mongodbReadOptions;
    }

    public void open() {
        if (this.cursor != null) {
            this.cursor.close();
        }
    }

    public void close() {
        if (this.cursor != null) {
            this.cursor.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) {
        synchronized (collector.getCheckpointLock()) {
            MongoSplit poll = this.pendingSplits.poll();
            if (poll != null) {
                if (this.cursor != null) {
                    return;
                }
                log.info("Prepared to read split {}", poll.splitId());
                try {
                    getCursor(poll);
                    Stream<BsonDocument> cursorToStream = cursorToStream();
                    DocumentDeserializer<SeaTunnelRow> documentDeserializer = this.deserializer;
                    documentDeserializer.getClass();
                    Stream<R> map = cursorToStream.map(documentDeserializer::deserialize);
                    collector.getClass();
                    map.forEach((v1) -> {
                        r1.collect(v1);
                    });
                    closeCurrentSplit();
                } catch (Throwable th) {
                    closeCurrentSplit();
                    throw th;
                }
            }
            if (this.noMoreSplit && this.pendingSplits.isEmpty()) {
                log.info("Closed the bounded mongodb source");
                this.context.signalNoMoreElement();
            }
        }
    }

    private void getCursor(MongoSplit mongoSplit) {
        this.cursor = this.clientProvider.getDefaultCollection().find(mongoSplit.getQuery()).projection(mongoSplit.getProjection()).batchSize2(this.readOptions.getFetchSize()).noCursorTimeout(this.readOptions.isNoCursorTimeout()).maxTime(this.readOptions.getMaxTimeMS(), TimeUnit.MINUTES).iterator();
    }

    private Stream<BsonDocument> cursorToStream() {
        Iterable iterable = () -> {
            return this.cursor;
        };
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public List<MongoSplit> snapshotState(long j) {
        return new ArrayList(this.pendingSplits);
    }

    public void addSplits(List<MongoSplit> list) {
        log.info("Adding split(s) to reader: {}", list);
        this.pendingSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("receive no more splits message, this reader will not add new split.");
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long j) {
    }

    private void closeCurrentSplit() {
        Preconditions.checkNotNull(this.cursor);
        this.cursor.close();
        this.cursor = null;
    }
}
