package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.lang.invoke.SerializedLambda;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.class */
public class MySqlSyncDatabaseAction implements Action {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
    private final Configuration mySqlConfig;
    private final String warehouse;
    private final String database;
    private final boolean ignoreIncompatible;
    private final String tablePrefix;
    private final String tableSuffix;

    @Nullable
    private final Pattern includingPattern;

    @Nullable
    private final Pattern excludingPattern;
    private final Map<String, String> catalogConfig;
    private final Map<String, String> tableConfig;

    MySqlSyncDatabaseAction(Map<String, String> map, String str, String str2, boolean z, Map<String, String> map2, Map<String, String> map3) {
        this(map, str, str2, z, null, null, null, null, map2, map3);
    }

    MySqlSyncDatabaseAction(Map<String, String> map, String str, String str2, boolean z, @Nullable String str3, @Nullable String str4, @Nullable String str5, @Nullable String str6, Map<String, String> map2, Map<String, String> map3) {
        this.mySqlConfig = Configuration.fromMap(map);
        this.warehouse = str;
        this.database = str2;
        this.ignoreIncompatible = z;
        this.tablePrefix = str3 == null ? "" : str3;
        this.tableSuffix = str4 == null ? "" : str4;
        this.includingPattern = str5 == null ? null : Pattern.compile(str5);
        this.excludingPattern = str6 == null ? null : Pattern.compile(str6);
        this.catalogConfig = map2;
        this.tableConfig = map3;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        FileStoreTable fileStoreTable;
        Preconditions.checkArgument(!this.mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), MySqlSourceOptions.TABLE_NAME.key() + " cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
        Catalog createPaimonCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.catalogConfig).set((ConfigOption<ConfigOption<String>>) CatalogOptions.WAREHOUSE, (ConfigOption<String>) this.warehouse));
        boolean caseSensitive = createPaimonCatalog.caseSensitive();
        if (!caseSensitive) {
            validateCaseInsensitive();
        }
        List<MySqlSchema> mySqlSchemaList = getMySqlSchemaList();
        Preconditions.checkArgument(mySqlSchemaList.size() > 0, "No tables found in MySQL database " + ((String) this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)) + ", or MySQL database does not exist.");
        createPaimonCatalog.createDatabase(this.database, true);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, this.tablePrefix, this.tableSuffix);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (MySqlSchema mySqlSchema : mySqlSchemaList) {
            Identifier identifier = new Identifier(this.database, tableNameConverter.convert(mySqlSchema.tableName()));
            Schema buildPaimonSchema = MySqlActionUtils.buildPaimonSchema(mySqlSchema, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), this.tableConfig, caseSensitive);
            try {
                fileStoreTable = (FileStoreTable) createPaimonCatalog.getTable(identifier);
                if (shouldMonitorTable(fileStoreTable.schema(), buildPaimonSchema, incompatibleMessage(fileStoreTable.schema(), mySqlSchema, identifier))) {
                    arrayList2.add(mySqlSchema.tableName());
                }
            } catch (Catalog.TableNotExistException e) {
                createPaimonCatalog.createTable(identifier, buildPaimonSchema, false);
                fileStoreTable = (FileStoreTable) createPaimonCatalog.getTable(identifier);
                arrayList2.add(mySqlSchema.tableName());
            }
            arrayList.add(fileStoreTable);
        }
        Preconditions.checkState(!arrayList2.isEmpty(), "No tables to be synchronized. Possible cause is the schemas of all tables in specified MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
        this.mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", arrayList2) + ")");
        MySqlSource<String> buildMySqlSource = MySqlActionUtils.buildMySqlSource(this.mySqlConfig);
        String str = (String) this.mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ZoneId systemDefault = str == null ? ZoneId.systemDefault() : ZoneId.of(str);
        FlinkCdcSyncDatabaseSinkBuilder withTables = new FlinkCdcSyncDatabaseSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildMySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")).withParserFactory(() -> {
            return new MySqlDebeziumJsonEventParser(systemDefault, caseSensitive, tableNameConverter);
        }).withTables(arrayList);
        String str2 = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str2 != null) {
            withTables.withParallelism(Integer.valueOf(Integer.parseInt(str2)));
        }
        withTables.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.tablePrefix.equals(this.tablePrefix.toLowerCase()), String.format("Table prefix [%s] cannot contain upper case in case-insensitive catalog.", this.tablePrefix));
        Preconditions.checkArgument(this.tableSuffix.equals(this.tableSuffix.toLowerCase()), String.format("Table suffix [%s] cannot contain upper case in case-insensitive catalog.", this.tablePrefix));
    }

    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
        String str = (String) this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
        ArrayList arrayList = new ArrayList();
        Connection connection = MySqlActionUtils.getConnection(this.mySqlConfig);
        Throwable th = null;
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet tables = metaData.getTables(str, null, "%", new String[]{"TABLE"});
            Throwable th2 = null;
            while (tables.next()) {
                try {
                    try {
                        String string = tables.getString("TABLE_NAME");
                        if (shouldMonitorTable(string)) {
                            MySqlSchema mySqlSchema = new MySqlSchema(metaData, str, string);
                            if (mySqlSchema.primaryKeys().size() > 0) {
                                arrayList.add(mySqlSchema);
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (tables != null) {
                        if (th2 != null) {
                            try {
                                tables.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            tables.close();
                        }
                    }
                    throw th3;
                }
            }
            if (tables != null) {
                if (0 != 0) {
                    try {
                        tables.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    tables.close();
                }
            }
            return arrayList;
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    private boolean shouldMonitorTable(String str) {
        boolean z = true;
        if (this.includingPattern != null) {
            z = this.includingPattern.matcher(str).matches();
        }
        if (this.excludingPattern != null) {
            z = z && !this.excludingPattern.matcher(str).matches();
        }
        LOG.debug("Source table {} is monitored? {}", str, Boolean.valueOf(z));
        return z;
    }

    private boolean shouldMonitorTable(TableSchema tableSchema, Schema schema, Supplier<String> supplier) {
        if (MySqlActionUtils.schemaCompatible(tableSchema, schema)) {
            return true;
        }
        if (!this.ignoreIncompatible) {
            throw new IllegalArgumentException(supplier.get() + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
        }
        LOG.warn(supplier.get() + "This table will be ignored.");
        return false;
    }

    private Supplier<String> incompatibleMessage(TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier identifier) {
        return () -> {
            return String.format("Incompatible schema found.\nPaimon table is: %s, fields are: %s.\nMySQL table is: %s.%s, fields are: %s.\n", identifier.getFullName(), tableSchema.fields(), mySqlSchema.databaseName(), mySqlSchema.tableName(), mySqlSchema.fields());
        };
    }

    public static Optional<Action> create(String[] strArr) {
        MultipleParameterTool fromArgs = MultipleParameterTool.fromArgs(strArr);
        if (fromArgs.has("help")) {
            printHelp();
            return Optional.empty();
        }
        String str = fromArgs.get("warehouse");
        String str2 = fromArgs.get("database");
        boolean parseBoolean = Boolean.parseBoolean(fromArgs.get("ignore-incompatible"));
        String str3 = fromArgs.get("table-prefix");
        String str4 = fromArgs.get("table-suffix");
        String str5 = fromArgs.get("including-tables");
        String str6 = fromArgs.get("excluding-tables");
        Optional<Map<String, String>> configMap = Action.getConfigMap(fromArgs, "mysql-conf");
        Optional<Map<String, String>> configMap2 = Action.getConfigMap(fromArgs, "catalog-conf");
        Optional<Map<String, String>> configMap3 = Action.getConfigMap(fromArgs, "table-conf");
        return configMap.map(map -> {
            return new MySqlSyncDatabaseAction(map, str, str2, parseBoolean, str3, str4, str5, str6, (Map) configMap2.orElse(Collections.emptyMap()), (Map) configMap3.orElse(Collections.emptyMap()));
        });
    }

    private static void printHelp() {
        System.out.println("Action \"mysql-sync-database\" creates a streaming job with a Flink MySQL CDC source and multiple Paimon table sinks to synchronize a whole MySQL database into one Paimon database.\nOnly MySQL tables with primary keys will be considered. Newly created MySQL tables after the job starts will not be included.");
        System.out.println();
        System.out.println("Syntax:");
        System.out.println("  mysql-sync-database --warehouse <warehouse-path> --database <database-name> [--ignore-incompatible <true/false>] [--table-prefix <paimon-table-prefix>] [--table-suffix <paimon-table-suffix>] [--including-tables <mysql-table-name|name-regular-expr>] [--excluding-tables <mysql-table-name|name-regular-expr>] [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
        System.out.println();
        System.out.println("--ignore-incompatible is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible, an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.");
        System.out.println();
        System.out.println("--table-prefix is the prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have \"ods_\" as prefix, you can specify `--table-prefix ods_`.");
        System.out.println("The usage of --table-suffix is same as `--table-prefix`");
        System.out.println();
        System.out.println("--including-tables is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables. Regular expression is supported.");
        System.out.println("--excluding-tables is used to specify which source tables are not to be synchronized. The usage is same as --including-tables.");
        System.out.println("--excluding-tables has higher priority than --including-tables if you specified both.");
        System.out.println();
        System.out.println("MySQL CDC source conf syntax:");
        System.out.println("  key=value");
        System.out.println("'hostname', 'username', 'password' and 'database-name' are required configurations, others are optional. Note that 'database-name' should be the exact name of the MySQL databse you want to synchronize. It can't be a regular expression.");
        System.out.println("For a complete list of supported configurations, see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
        System.out.println();
        System.out.println("Paimon catalog and table sink conf syntax:");
        System.out.println("  key=value");
        System.out.println("All Paimon sink table will be applied the same set of configurations.");
        System.out.println("For a complete list of supported configurations, see https://paimon.apache.org/docs/master/maintenance/configurations/");
        System.out.println();
        System.out.println("Examples:");
        System.out.println("  mysql-sync-database \\\n    --warehouse hdfs:///path/to/warehouse \\\n    --database test_db \\\n    --mysql-conf hostname=127.0.0.1 \\\n    --mysql-conf username=root \\\n    --mysql-conf password=123456 \\\n    --mysql-conf database-name=source_db \\\n    --catalog-conf metastore=hive \\\n    --catalog-conf uri=thrift://hive-metastore:9083 \\\n    --table-conf bucket=4 \\\n    --table-conf changelog-producer=input \\\n    --table-conf sink.parallelism=4");
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        executionEnvironment.execute(String.format("MySQL-Paimon Database Sync: %s", this.database));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 243365463:
                if (implMethodName.equals("lambda$build$b9953546$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction") && serializedLambda.getImplMethodSignature().equals("(Ljava/time/ZoneId;ZLorg/apache/paimon/flink/action/cdc/mysql/TableNameConverter;)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    ZoneId zoneId = (ZoneId) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    TableNameConverter tableNameConverter = (TableNameConverter) serializedLambda.getCapturedArg(2);
                    return () -> {
                        return new MySqlDebeziumJsonEventParser(zoneId, booleanValue, tableNameConverter);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
