package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.class */
public class MySqlSyncDatabaseAction extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
    private final String database;
    private final Configuration mySqlConfig;
    private Map<String, String> tableConfig;
    private boolean ignoreIncompatible;
    private boolean mergeShards;
    private String tablePrefix;
    private String tableSuffix;
    private String includingTables;

    @Nullable
    String excludingTables;
    private MultiTablesSinkMode mode;
    private TypeMapping typeMapping;
    private final List<Identifier> monitoredTables;
    private final List<Identifier> excludedTables;
    private List<String> metadataColumn;

    public MySqlSyncDatabaseAction(String str, String str2, Map<String, String> map, Map<String, String> map2) {
        super(str, map);
        this.tableConfig = new HashMap();
        this.ignoreIncompatible = false;
        this.mergeShards = true;
        this.tablePrefix = "";
        this.tableSuffix = "";
        this.includingTables = ".*";
        this.mode = MultiTablesSinkMode.DIVIDED;
        this.typeMapping = TypeMapping.defaultMapping();
        this.monitoredTables = new ArrayList();
        this.excludedTables = new ArrayList();
        this.metadataColumn = new ArrayList();
        this.database = str2;
        this.mySqlConfig = Configuration.fromMap(map2);
        MySqlActionUtils.registerJdbcDriver();
    }

    public MySqlSyncDatabaseAction withTableConfig(Map<String, String> map) {
        this.tableConfig = map;
        return this;
    }

    public MySqlSyncDatabaseAction ignoreIncompatible(boolean z) {
        this.ignoreIncompatible = z;
        return this;
    }

    public MySqlSyncDatabaseAction mergeShards(boolean z) {
        this.mergeShards = z;
        return this;
    }

    public MySqlSyncDatabaseAction withTablePrefix(@Nullable String str) {
        if (str != null) {
            this.tablePrefix = str;
        }
        return this;
    }

    public MySqlSyncDatabaseAction withTableSuffix(@Nullable String str) {
        if (str != null) {
            this.tableSuffix = str;
        }
        return this;
    }

    public MySqlSyncDatabaseAction includingTables(@Nullable String str) {
        if (str != null) {
            this.includingTables = str;
        }
        return this;
    }

    public MySqlSyncDatabaseAction excludingTables(@Nullable String str) {
        this.excludingTables = str;
        return this;
    }

    public MySqlSyncDatabaseAction withMode(MultiTablesSinkMode multiTablesSinkMode) {
        this.mode = multiTablesSinkMode;
        return this;
    }

    public MySqlSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public MySqlSyncDatabaseAction withMetadataKeys(List<String> list) {
        this.metadataColumn = list;
        return this;
    }

    public void build() throws Exception {
        Preconditions.checkArgument(!this.mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), MySqlSourceOptions.TABLE_NAME.key() + " cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
        boolean caseSensitive = this.catalog.caseSensitive();
        validateCaseInsensitive(caseSensitive);
        Pattern compile = Pattern.compile(this.includingTables);
        Pattern compile2 = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        MySqlSchemasInfo mySqlTableInfos = MySqlActionUtils.getMySqlTableInfos(this.mySqlConfig, str -> {
            return shouldMonitorTable(str, compile, compile2);
        }, this.excludedTables, this.typeMapping, caseSensitive);
        logNonPkTables(mySqlTableInfos.nonPkTables());
        List<MySqlTableInfo> mySqlTableInfos2 = mySqlTableInfos.toMySqlTableInfos(this.mergeShards);
        Preconditions.checkArgument(mySqlTableInfos2.size() > 0, "No tables found in MySQL database " + ((String) this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)) + ", or MySQL database does not exist.");
        this.catalog.createDatabase(this.database, true);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, this.mergeShards, this.tablePrefix, this.tableSuffix);
        CdcMetadataConverter[] cdcMetadataConverterArr = (CdcMetadataConverter[]) this.metadataColumn.stream().map(MySqlMetadataProcessor::converter).toArray(i -> {
            return new CdcMetadataConverter[i];
        });
        ArrayList arrayList = new ArrayList();
        for (MySqlTableInfo mySqlTableInfo : mySqlTableInfos2) {
            Identifier create = Identifier.create(this.database, tableNameConverter.convert(mySqlTableInfo.toPaimonTableName()));
            Schema buildPaimonSchema = CdcActionCommonUtils.buildPaimonSchema(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), this.tableConfig, mySqlTableInfo.schema(), cdcMetadataConverterArr, true);
            try {
                FileStoreTable copy = this.catalog.getTable(create).copy(this.tableConfig);
                if (shouldMonitorTable(copy.schema(), buildPaimonSchema, incompatibleMessage(copy.schema(), mySqlTableInfo, create))) {
                    arrayList.add(copy);
                    this.monitoredTables.addAll(mySqlTableInfo.identifiers());
                } else {
                    this.excludedTables.addAll(mySqlTableInfo.identifiers());
                }
            } catch (Catalog.TableNotExistException e) {
                this.catalog.createTable(create, buildPaimonSchema, false);
                arrayList.add(this.catalog.getTable(create));
                this.monitoredTables.addAll(mySqlTableInfo.identifiers());
            }
        }
        Preconditions.checkState(!this.monitoredTables.isEmpty(), "No tables to be synchronized. Possible cause is the schemas of all tables in specified MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
        MySqlSource<String> buildMySqlSource = MySqlActionUtils.buildMySqlSource(this.mySqlConfig, CdcActionCommonUtils.tableList(this.mode, (String) this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME), this.includingTables, this.monitoredTables, this.excludedTables));
        NewTableSchemaBuilder newTableSchemaBuilder = new NewTableSchemaBuilder(this.tableConfig, caseSensitive);
        MySqlRecordParser mySqlRecordParser = new MySqlRecordParser(this.mySqlConfig, caseSensitive, this.typeMapping, cdcMetadataConverterArr);
        new FlinkCdcSyncDatabaseSinkBuilder().withInput(this.env.fromSource(buildMySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").flatMap(mySqlRecordParser).name("Parse")).withParserFactory(() -> {
            return new RichCdcMultiplexRecordEventParser(newTableSchemaBuilder, compile, compile2, tableNameConverter);
        }).withDatabase(this.database).withCatalogLoader(catalogLoader()).withTables(arrayList).withMode(this.mode).withTableOptions(this.tableConfig).build();
    }

    private void validateCaseInsensitive(boolean z) {
        AbstractCatalog.validateCaseInsensitive(z, "Database", new String[]{this.database});
        AbstractCatalog.validateCaseInsensitive(z, "Table prefix", new String[]{this.tablePrefix});
        AbstractCatalog.validateCaseInsensitive(z, "Table suffix", new String[]{this.tableSuffix});
    }

    private void logNonPkTables(List<Identifier> list) {
        if (list.isEmpty()) {
            return;
        }
        LOG.debug("Didn't find primary keys for tables '{}'. These tables won't be synchronized.", list.stream().map((v0) -> {
            return v0.getFullName();
        }).collect(Collectors.joining(",")));
        this.excludedTables.addAll(list);
    }

    private boolean shouldMonitorTable(String str, Pattern pattern, @Nullable Pattern pattern2) {
        boolean matches = pattern.matcher(str).matches();
        if (pattern2 != null) {
            matches = matches && !pattern2.matcher(str).matches();
        }
        if (!matches) {
            LOG.debug("Source table '{}' is excluded.", str);
        }
        return matches;
    }

    private boolean shouldMonitorTable(TableSchema tableSchema, Schema schema, Supplier<String> supplier) {
        if (CdcActionCommonUtils.schemaCompatible(tableSchema, schema.fields())) {
            return true;
        }
        if (!this.ignoreIncompatible) {
            throw new IllegalArgumentException(supplier.get() + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
        }
        LOG.warn(supplier.get() + "This table will be ignored.");
        return false;
    }

    private Supplier<String> incompatibleMessage(TableSchema tableSchema, MySqlTableInfo mySqlTableInfo, Identifier identifier) {
        return () -> {
            return String.format("Incompatible schema found.\nPaimon table is: %s, fields are: %s.\nMySQL table is: %s, fields are: %s.\n", identifier.getFullName(), tableSchema.fields(), mySqlTableInfo.location(), mySqlTableInfo.schema().fields());
        };
    }

    @VisibleForTesting
    public List<Identifier> monitoredTables() {
        return this.monitoredTables;
    }

    @VisibleForTesting
    public List<Identifier> excludedTables() {
        return this.excludedTables;
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    public void run() throws Exception {
        build();
        execute(String.format("MySQL-Paimon Database Sync: %s", this.database));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 143832632:
                if (implMethodName.equals("lambda$build$5605a7ea$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder;Ljava/util/regex/Pattern;Ljava/util/regex/Pattern;Lorg/apache/paimon/flink/action/cdc/TableNameConverter;)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    NewTableSchemaBuilder newTableSchemaBuilder = (NewTableSchemaBuilder) serializedLambda.getCapturedArg(0);
                    Pattern pattern = (Pattern) serializedLambda.getCapturedArg(1);
                    Pattern pattern2 = (Pattern) serializedLambda.getCapturedArg(2);
                    TableNameConverter tableNameConverter = (TableNameConverter) serializedLambda.getCapturedArg(3);
                    return () -> {
                        return new RichCdcMultiplexRecordEventParser(newTableSchemaBuilder, pattern, pattern2, tableNameConverter);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
