package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect;

import com.mongodb.client.MongoClient;
import io.debezium.relational.TableId;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbScanFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbStreamFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.splitters.MongodbChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.CollectionDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.class */
public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {
    private static final Logger log = LoggerFactory.getLogger(MongodbDialect.class);
    private final Map<MongodbSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache = new ConcurrentHashMap();

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public String getName() {
        return MongodbSourceOptions.DIALECT_NAME;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public List<TableId> discoverDataCollections(MongodbSourceConfig mongodbSourceConfig) {
        return (List) discoverAndCacheDataCollections(mongodbSourceConfig).getDiscoveredCollections().stream().map(TableId::parse).collect(Collectors.toList());
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public boolean isDataCollectionIdCaseSensitive(MongodbSourceConfig mongodbSourceConfig) {
        return true;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public ChunkSplitter createChunkSplitter(MongodbSourceConfig mongodbSourceConfig) {
        return new MongodbChunkSplitter(mongodbSourceConfig);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public FetchTask<SourceSplitBase> createFetchTask(@Nonnull SourceSplitBase sourceSplitBase) {
        return sourceSplitBase.isSnapshotSplit() ? new MongodbScanFetchTask(sourceSplitBase.asSnapshotSplit()) : new MongodbStreamFetchTask(sourceSplitBase.asIncrementalSplit());
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect
    public FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, MongodbSourceConfig mongodbSourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections = discoverAndCacheDataCollections(mongodbSourceConfig);
        return new MongodbFetchTaskContext(this, mongodbSourceConfig, MongodbUtils.getChangeStreamDescriptor(mongodbSourceConfig, discoverAndCacheDataCollections.getDiscoveredDatabases(), discoverAndCacheDataCollections.getDiscoveredCollections()));
    }

    private CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections(MongodbSourceConfig mongodbSourceConfig) {
        return this.cache.computeIfAbsent(mongodbSourceConfig, mongodbSourceConfig2 -> {
            MongoClient createMongoClient = MongodbUtils.createMongoClient(mongodbSourceConfig);
            List<String> databaseNames = CollectionDiscoveryUtils.databaseNames(createMongoClient, CollectionDiscoveryUtils.databaseFilter(mongodbSourceConfig.getDatabaseList()));
            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(databaseNames, CollectionDiscoveryUtils.collectionNames(createMongoClient, databaseNames, CollectionDiscoveryUtils.collectionsFilter(mongodbSourceConfig.getCollectionList())));
        });
    }

    public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig mongodbSourceConfig) {
        ChangeStreamOffset changeStreamOffset;
        MongoClient createMongoClient = MongodbUtils.createMongoClient(mongodbSourceConfig);
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections = discoverAndCacheDataCollections(mongodbSourceConfig);
        BsonDocument latestResumeToken = MongodbUtils.getLatestResumeToken(createMongoClient, MongodbUtils.getChangeStreamDescriptor(mongodbSourceConfig, discoverAndCacheDataCollections.getDiscoveredDatabases(), discoverAndCacheDataCollections.getDiscoveredCollections()));
        if (latestResumeToken != null) {
            changeStreamOffset = new ChangeStreamOffset(latestResumeToken);
            log.info("startup resume token={},change stream offset={}", latestResumeToken, changeStreamOffset);
        } else {
            changeStreamOffset = new ChangeStreamOffset(MongodbUtils.getCurrentClusterTime(createMongoClient));
        }
        return changeStreamOffset;
    }
}
