package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSplitStrategy;
import org.bson.BsonDocument;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.class */
public class MongodbSource implements SeaTunnelSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>, SupportColumnProjection {
    private static final long serialVersionUID = 1;
    private final CatalogTable catalogTable;
    private final ReadonlyConfig options;

    public MongodbSource(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
        this.catalogTable = catalogTable;
        this.options = readonlyConfig;
    }

    public String getPluginName() {
        return MongodbConfig.CONNECTOR_IDENTITY;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return Collections.singletonList(this.catalogTable);
    }

    public SourceReader<SeaTunnelRow, MongoSplit> createReader(SourceReader.Context context) {
        return new MongodbReader(context, crateClientProvider(this.options), createDeserializer(this.options, this.catalogTable.getSeaTunnelRowType()), createMongodbReadOptions(this.options));
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> createEnumerator(SourceSplitEnumerator.Context<MongoSplit> context) {
        MongodbClientProvider crateClientProvider = crateClientProvider(this.options);
        return new MongodbSplitEnumerator(context, crateClientProvider, createSplitStrategy(this.options, crateClientProvider));
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> restoreEnumerator(SourceSplitEnumerator.Context<MongoSplit> context, ArrayList<MongoSplit> arrayList) {
        MongodbClientProvider crateClientProvider = crateClientProvider(this.options);
        return new MongodbSplitEnumerator(context, crateClientProvider, createSplitStrategy(this.options, crateClientProvider), arrayList);
    }

    private MongodbClientProvider crateClientProvider(ReadonlyConfig readonlyConfig) {
        return MongodbCollectionProvider.builder().connectionString((String) readonlyConfig.get(MongodbConfig.URI)).database((String) readonlyConfig.get(MongodbConfig.DATABASE)).collection((String) readonlyConfig.get(MongodbConfig.COLLECTION)).build();
    }

    private DocumentRowDataDeserializer createDeserializer(ReadonlyConfig readonlyConfig, SeaTunnelRowType seaTunnelRowType) {
        return new DocumentRowDataDeserializer(seaTunnelRowType.getFieldNames(), seaTunnelRowType, ((Boolean) readonlyConfig.get(MongodbConfig.FLAT_SYNC_STRING)).booleanValue());
    }

    private MongoSplitStrategy createSplitStrategy(ReadonlyConfig readonlyConfig, MongodbClientProvider mongodbClientProvider) {
        SamplingSplitStrategy.Builder builder = SamplingSplitStrategy.builder();
        builder.setSplitKey((String) readonlyConfig.get(MongodbConfig.SPLIT_KEY));
        builder.setSizePerSplit(((Long) readonlyConfig.get(MongodbConfig.SPLIT_SIZE)).longValue());
        readonlyConfig.getOptional(MongodbConfig.MATCH_QUERY).ifPresent(str -> {
            builder.setMatchQuery(BsonDocument.parse(str));
        });
        readonlyConfig.getOptional(MongodbConfig.PROJECTION).ifPresent(str2 -> {
            builder.setProjection(BsonDocument.parse(str2));
        });
        return builder.setClientProvider(mongodbClientProvider).build();
    }

    private MongodbReadOptions createMongodbReadOptions(ReadonlyConfig readonlyConfig) {
        MongodbReadOptions.MongoReadOptionsBuilder builder = MongodbReadOptions.builder();
        builder.setMaxTimeMS(((Long) readonlyConfig.get(MongodbConfig.MAX_TIME_MIN)).longValue());
        builder.setFetchSize(((Integer) readonlyConfig.get(MongodbConfig.FETCH_SIZE)).intValue());
        builder.setNoCursorTimeout(((Boolean) readonlyConfig.get(MongodbConfig.CURSOR_NO_TIMEOUT)).booleanValue());
        return builder.build();
    }

    public /* bridge */ /* synthetic */ SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context context, Serializable serializable) throws Exception {
        return restoreEnumerator((SourceSplitEnumerator.Context<MongoSplit>) context, (ArrayList<MongoSplit>) serializable);
    }
}
