/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.dialect;

import com.mongodb.client.MongoClient;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class MongoDBDialect
implements DataSourceDialect<MongoDBSourceConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBDialect.class);
    private final Map<MongoDBSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache = new ConcurrentHashMap<MongoDBSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo>();
    private transient Predicate<String> collectionsFilter;

    @Override
    public String getName() {
        return "MongoDB";
    }

    private static TableId parseTableId(String str) {
        return MongoDBDialect.parseTableId(str, true);
    }

    private static TableId parseTableId(String str, boolean useCatalogBeforeSchema) {
        String[] parts = str.split("[.]", 2);
        int numParts = parts.length;
        if (numParts == 1) {
            return new TableId(null, null, parts[0]);
        }
        if (numParts == 2) {
            return useCatalogBeforeSchema ? new TableId(parts[0], null, parts[1]) : new TableId(null, parts[0], parts[1]);
        }
        return null;
    }

    @Override
    public List<TableId> discoverDataCollections(MongoDBSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        return discoveryInfo.getDiscoveredCollections().stream().map(MongoDBDialect::parseTableId).collect(Collectors.toList());
    }

    @Override
    public Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(MongoDBSourceConfig sourceConfig) {
        List<TableId> discoveredCollections = this.discoverDataCollections(sourceConfig);
        HashMap<TableId, TableChanges.TableChange> schemas = new HashMap<TableId, TableChanges.TableChange>(discoveredCollections.size());
        for (TableId collectionId : discoveredCollections) {
            schemas.put(collectionId, MongoDBDialect.collectionSchema(collectionId));
        }
        return schemas;
    }

    private CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections(MongoDBSourceConfig sourceConfig) {
        return this.cache.computeIfAbsent(sourceConfig, config -> {
            MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
            List<String> discoveredDatabases = CollectionDiscoveryUtils.databaseNames(mongoClient, CollectionDiscoveryUtils.databaseFilter(sourceConfig.getDatabaseList()));
            List<String> discoveredCollections = CollectionDiscoveryUtils.collectionNames(mongoClient, discoveredDatabases, CollectionDiscoveryUtils.collectionsFilter(sourceConfig.getCollectionList()));
            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(discoveredDatabases, discoveredCollections);
        });
    }

    public static TableChanges.TableChange collectionSchema(TableId tableId) {
        Table table = Table.editor().tableId(tableId).addColumn(Column.editor().name("_id").optional(false).create()).setPrimaryKeyNames("_id").create();
        return new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
    }

    public ChangeStreamOffset displayCurrentOffset(MongoDBSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo;
        ChangeStreamDescriptor changeStreamDescriptor;
        MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
        BsonDocument startupResumeToken = MongoUtils.getLatestResumeToken(mongoClient, changeStreamDescriptor = MongoUtils.getChangeStreamDescriptor(sourceConfig, (discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig)).getDiscoveredDatabases(), discoveryInfo.getDiscoveredCollections()));
        ChangeStreamOffset changeStreamOffset = startupResumeToken != null ? new ChangeStreamOffset(startupResumeToken) : new ChangeStreamOffset(MongoUtils.getCurrentClusterTime(mongoClient));
        LOG.info("Current change stream offset : {}", (Object)changeStreamOffset);
        return changeStreamOffset;
    }

    @Override
    public boolean isDataCollectionIdCaseSensitive(MongoDBSourceConfig sourceConfig) {
        return true;
    }

    @Override
    public ChunkSplitter createChunkSplitter(MongoDBSourceConfig sourceConfig) {
        return new MongoDBChunkSplitter(sourceConfig);
    }

    @Override
    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new MongoDBScanFetchTask(sourceSplitBase.asSnapshotSplit());
        }
        return new MongoDBStreamFetchTask(sourceSplitBase.asStreamSplit());
    }

    public MongoDBFetchTaskContext createFetchTaskContext(MongoDBSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        ChangeStreamDescriptor changeStreamDescriptor = MongoUtils.getChangeStreamDescriptor(sourceConfig, discoveryInfo.getDiscoveredDatabases(), discoveryInfo.getDiscoveredCollections());
        return new MongoDBFetchTaskContext(this, sourceConfig, changeStreamDescriptor);
    }

    @Override
    public boolean isIncludeDataCollection(MongoDBSourceConfig sourceConfig, TableId tableId) {
        if (this.collectionsFilter == null) {
            this.collectionsFilter = CollectionDiscoveryUtils.collectionsFilter(sourceConfig.getCollectionList());
        }
        return this.collectionsFilter.test(tableId.catalog() + "." + tableId.table());
    }
}

