package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import java.io.IOException;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Deserializer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.class */
public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(MongodbSourceReader.class);
    private final SingleSplitReaderContext context;
    private MongoClient client;
    private final MongodbParameters params;
    private final Deserializer deserializer;
    private final Bson projectionFields;
    private final boolean useSimpleTextSchema;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongodbSourceReader(SingleSplitReaderContext singleSplitReaderContext, MongodbParameters mongodbParameters, SeaTunnelRowType seaTunnelRowType, boolean z) {
        this.context = singleSplitReaderContext;
        this.params = mongodbParameters;
        this.useSimpleTextSchema = z;
        if (z) {
            this.deserializer = null;
            this.projectionFields = null;
        } else {
            this.deserializer = new DefaultDeserializer(seaTunnelRowType);
            this.projectionFields = Projections.fields(Projections.include(seaTunnelRowType.getFieldNames()), Projections.excludeId());
        }
    }

    public void open() throws Exception {
        this.client = MongoClients.create(this.params.getUri());
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        try {
            MongoCursor<Document> it = this.client.getDatabase(this.params.getDatabase()).getCollection(this.params.getCollection()).find().projection(this.projectionFields).iterator();
            Throwable th = null;
            while (it.hasNext()) {
                try {
                    try {
                        Document next = it.next();
                        if (this.useSimpleTextSchema) {
                            collector.collect(new SeaTunnelRow(new Object[]{next.toJson()}));
                        } else {
                            collector.collect(this.deserializer.deserialize(next));
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            if (it != null) {
                if (0 != 0) {
                    try {
                        it.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    it.close();
                }
            }
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                log.info("Closed the bounded mongodb source");
                this.context.signalNoMoreElement();
            }
        } catch (Throwable th4) {
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                log.info("Closed the bounded mongodb source");
                this.context.signalNoMoreElement();
            }
            throw th4;
        }
    }
}
