package org.apache.paimon.flink.action.cdc.mongodb;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.class */
public class MongoDBSyncTableAction extends SyncTableActionBase {
    public MongoDBSyncTableAction(String str, String str2, String str3, Map<String, String> map, Map<String, String> map2) {
        super(str, str2, str3, map, map2);
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected void checkCdcSourceArgument() {
        Preconditions.checkArgument(this.cdcSourceConfig.contains(MongoDBSourceOptions.COLLECTION), String.format("mongodb-conf [%s] must be specified.", MongoDBSourceOptions.COLLECTION.key()));
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected Schema retrieveSchema() throws Exception {
        return MongodbSchemaUtils.getMongodbSchema(this.cdcSourceConfig, this.catalog.caseSensitive());
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected DataStreamSource<String> buildSource() throws Exception {
        return buildDataStreamSource(MongoDBActionUtils.buildMongodbSource(this.cdcSourceConfig, ((String) this.cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)) + "\\." + ((String) this.cdcSourceConfig.get(MongoDBSourceOptions.COLLECTION))));
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected String sourceName() {
        return "MongoDB Source";
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
        return new MongoDBRecordParser(this.catalog.caseSensitive(), this.computedColumns, this.cdcSourceConfig);
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected String jobName() {
        return String.format("MongoDB-Paimon Table Sync: %s.%s", this.database, this.table);
    }
}
