package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.class */
public class MySqlSyncDatabaseAction extends SyncDatabaseActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
    private boolean ignoreIncompatible;
    private final List<Identifier> monitoredTables;
    private final List<Identifier> excludedTables;

    public MySqlSyncDatabaseAction(String str, String str2, Map<String, String> map, Map<String, String> map2) {
        super(str, str2, map, map2, SyncJobHandler.SourceType.MYSQL);
        this.ignoreIncompatible = false;
        this.monitoredTables = new ArrayList();
        this.excludedTables = new ArrayList();
        this.mode = MultiTablesSinkMode.DIVIDED;
    }

    public MySqlSyncDatabaseAction ignoreIncompatible(boolean z) {
        this.ignoreIncompatible = z;
        return this;
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected void beforeBuildingSourceSink() throws Exception {
        Pattern compile = Pattern.compile(this.includingTables);
        Pattern compile2 = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        JdbcSchemasInfo mySqlTableInfos = MySqlActionUtils.getMySqlTableInfos(this.cdcSourceConfig, str -> {
            return shouldMonitorTable(str, compile, compile2);
        }, this.excludedTables, this.typeMapping);
        logNonPkTables(mySqlTableInfos.nonPkTables());
        List<JdbcTableInfo> mySqlTableInfos2 = mySqlTableInfos.toMySqlTableInfos(this.mergeShards);
        Preconditions.checkArgument(!mySqlTableInfos2.isEmpty(), "No tables found in MySQL database " + ((String) this.cdcSourceConfig.get(MySqlSourceOptions.DATABASE_NAME)) + ", or MySQL database does not exist.");
        TableNameConverter tableNameConverter = new TableNameConverter(this.caseSensitive, this.mergeShards, this.tablePrefix, this.tableSuffix);
        for (JdbcTableInfo jdbcTableInfo : mySqlTableInfos2) {
            Identifier create = Identifier.create(this.database, tableNameConverter.convert(jdbcTableInfo.toPaimonTableName()));
            Schema buildPaimonSchema = CdcActionCommonUtils.buildPaimonSchema(create.getFullName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), this.tableConfig, jdbcTableInfo.schema(), this.metadataConverters, this.caseSensitive, true);
            try {
                FileStoreTable fileStoreTable = (FileStoreTable) this.catalog.getTable(create);
                if (shouldMonitorTable(fileStoreTable.schema(), buildPaimonSchema, incompatibleMessage(fileStoreTable.schema(), jdbcTableInfo, create))) {
                    this.tables.add(alterTableOptions(create, fileStoreTable));
                    this.monitoredTables.addAll(jdbcTableInfo.identifiers());
                } else {
                    this.excludedTables.addAll(jdbcTableInfo.identifiers());
                }
            } catch (Catalog.TableNotExistException e) {
                this.catalog.createTable(create, buildPaimonSchema, false);
                this.tables.add(this.catalog.getTable(create));
                this.monitoredTables.addAll(jdbcTableInfo.identifiers());
            }
        }
        Preconditions.checkState(!this.monitoredTables.isEmpty(), "No tables to be synchronized. Possible cause is the schemas of all tables in specified MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    public MySqlSource<CdcSourceRecord> buildSource() {
        return MySqlActionUtils.buildMySqlSource(this.cdcSourceConfig, CdcActionCommonUtils.tableList(this.mode, (String) this.cdcSourceConfig.get(MySqlSourceOptions.DATABASE_NAME), this.includingTables, this.monitoredTables, this.excludedTables), this.typeMapping);
    }

    private void logNonPkTables(List<Identifier> list) {
        if (list.isEmpty()) {
            return;
        }
        LOG.debug("Didn't find primary keys for tables '{}'. These tables won't be synchronized.", list.stream().map((v0) -> {
            return v0.getFullName();
        }).collect(Collectors.joining(",")));
        this.excludedTables.addAll(list);
    }

    private boolean shouldMonitorTable(String str, Pattern pattern, @Nullable Pattern pattern2) {
        boolean matches = pattern.matcher(str).matches();
        if (pattern2 != null) {
            matches = matches && !pattern2.matcher(str).matches();
        }
        if (!matches) {
            LOG.debug("Source table '{}' is excluded.", str);
        }
        return matches;
    }

    private boolean shouldMonitorTable(TableSchema tableSchema, Schema schema, Supplier<String> supplier) {
        if (CdcActionCommonUtils.schemaCompatible(tableSchema, schema.fields())) {
            return true;
        }
        if (!this.ignoreIncompatible) {
            throw new IllegalArgumentException(supplier.get() + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
        }
        LOG.warn(supplier.get() + "This table will be ignored.");
        return false;
    }

    private Supplier<String> incompatibleMessage(TableSchema tableSchema, JdbcTableInfo jdbcTableInfo, Identifier identifier) {
        return () -> {
            return String.format("Incompatible schema found.\nPaimon table is: %s, fields are: %s.\nMySQL table is: %s, fields are: %s.\n", identifier.getFullName(), tableSchema.fields(), jdbcTableInfo.location(), jdbcTableInfo.schema().fields());
        };
    }

    @VisibleForTesting
    public List<Identifier> monitoredTables() {
        return this.monitoredTables;
    }

    @VisibleForTesting
    public List<Identifier> excludedTables() {
        return this.excludedTables;
    }
}
