package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.lang.invoke.SerializedLambda;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncTableSinkBuilder;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.class */
public class MySqlSyncTableAction implements Action {
    private final Configuration mySqlConfig;
    private final String warehouse;
    private final String database;
    private final String table;
    private final List<String> partitionKeys;
    private final List<String> primaryKeys;
    private final List<String> computedColumnArgs;
    private final Map<String, String> catalogConfig;
    private final Map<String, String> tableConfig;

    MySqlSyncTableAction(Map<String, String> map, String str, String str2, String str3, List<String> list, List<String> list2, Map<String, String> map2, Map<String, String> map3) {
        this(map, str, str2, str3, list, list2, Collections.emptyList(), map2, map3);
    }

    MySqlSyncTableAction(Map<String, String> map, String str, String str2, String str3, List<String> list, List<String> list2, List<String> list3, Map<String, String> map2, Map<String, String> map3) {
        this.mySqlConfig = Configuration.fromMap(map);
        this.warehouse = str;
        this.database = str2;
        this.table = str3;
        this.partitionKeys = list;
        this.primaryKeys = list2;
        this.computedColumnArgs = list3;
        this.catalogConfig = map2;
        this.tableConfig = map3;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        FileStoreTable fileStoreTable;
        MySqlSource<String> buildMySqlSource = MySqlActionUtils.buildMySqlSource(this.mySqlConfig);
        Catalog createPaimonCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.catalogConfig).set((ConfigOption<ConfigOption<String>>) CatalogOptions.WAREHOUSE, (ConfigOption<String>) this.warehouse));
        boolean caseSensitive = createPaimonCatalog.caseSensitive();
        if (!caseSensitive) {
            validateCaseInsensitive();
        }
        MySqlSchema orElseThrow = getMySqlSchemaList().stream().reduce((v0, v1) -> {
            return v0.merge(v1);
        }).orElseThrow(() -> {
            return new RuntimeException("No table satisfies the given database name and table name");
        });
        createPaimonCatalog.createDatabase(this.database, true);
        Identifier identifier = new Identifier(this.database, this.table);
        List<ComputedColumn> buildComputedColumns = MySqlActionUtils.buildComputedColumns(this.computedColumnArgs, orElseThrow.typeMapping());
        Schema buildPaimonSchema = MySqlActionUtils.buildPaimonSchema(orElseThrow, this.partitionKeys, this.primaryKeys, buildComputedColumns, this.tableConfig, caseSensitive);
        try {
            fileStoreTable = (FileStoreTable) createPaimonCatalog.getTable(identifier);
            Preconditions.checkArgument(this.computedColumnArgs.isEmpty(), "Cannot add computed column when table already exists.");
            MySqlActionUtils.assertSchemaCompatible(fileStoreTable.schema(), buildPaimonSchema);
        } catch (Catalog.TableNotExistException e) {
            createPaimonCatalog.createTable(identifier, buildPaimonSchema, false);
            fileStoreTable = (FileStoreTable) createPaimonCatalog.getTable(identifier);
        }
        String str = (String) this.mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ZoneId systemDefault = str == null ? ZoneId.systemDefault() : ZoneId.of(str);
        FlinkCdcSyncTableSinkBuilder withTable = new FlinkCdcSyncTableSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildMySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")).withParserFactory(() -> {
            return new MySqlDebeziumJsonEventParser(systemDefault, caseSensitive, (List<ComputedColumn>) buildComputedColumns);
        }).withTable(fileStoreTable);
        String str2 = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str2 != null) {
            withTable.withParallelism(Integer.valueOf(Integer.parseInt(str2)));
        }
        withTable.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.table.equals(this.table.toLowerCase()), String.format("Table name [%s] cannot contain upper case in case-insensitive catalog.", this.table));
        for (String str : this.partitionKeys) {
            Preconditions.checkArgument(str.equals(str.toLowerCase()), String.format("Partition keys [%s] cannot contain upper case in case-insensitive catalog.", this.partitionKeys));
        }
        for (String str2 : this.primaryKeys) {
            Preconditions.checkArgument(str2.equals(str2.toLowerCase()), String.format("Primary keys [%s] cannot contain upper case in case-insensitive catalog.", this.primaryKeys));
        }
    }

    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
        Pattern compile = Pattern.compile((String) this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
        Pattern compile2 = Pattern.compile((String) this.mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
        ArrayList arrayList = new ArrayList();
        Connection connection = MySqlActionUtils.getConnection(this.mySqlConfig);
        Throwable th = null;
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet catalogs = metaData.getCatalogs();
            Throwable th2 = null;
            while (catalogs.next()) {
                try {
                    try {
                        String string = catalogs.getString("TABLE_CAT");
                        if (compile.matcher(string).matches()) {
                            ResultSet tables = metaData.getTables(string, null, "%", null);
                            Throwable th3 = null;
                            while (tables.next()) {
                                try {
                                    try {
                                        String string2 = tables.getString("TABLE_NAME");
                                        if (compile2.matcher(string2).matches()) {
                                            arrayList.add(new MySqlSchema(metaData, string, string2));
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            }
                            if (tables != null) {
                                if (0 != 0) {
                                    try {
                                        tables.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    tables.close();
                                }
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (catalogs != null) {
                        if (th2 != null) {
                            try {
                                catalogs.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            catalogs.close();
                        }
                    }
                    throw th5;
                }
            }
            if (catalogs != null) {
                if (0 != 0) {
                    try {
                        catalogs.close();
                    } catch (Throwable th7) {
                        th2.addSuppressed(th7);
                    }
                } else {
                    catalogs.close();
                }
            }
            return arrayList;
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    public static Optional<Action> create(String[] strArr) {
        MultipleParameterTool fromArgs = MultipleParameterTool.fromArgs(strArr);
        if (fromArgs.has("help")) {
            printHelp();
            return Optional.empty();
        }
        Tuple3<String, String, String> tablePath = Action.getTablePath(fromArgs);
        if (tablePath == null) {
            return Optional.empty();
        }
        List emptyList = Collections.emptyList();
        if (fromArgs.has("partition-keys")) {
            emptyList = (List) Arrays.stream(fromArgs.get("partition-keys").split(FieldListaggAgg.DELIMITER)).collect(Collectors.toList());
        }
        List emptyList2 = Collections.emptyList();
        if (fromArgs.has("primary-keys")) {
            emptyList2 = (List) Arrays.stream(fromArgs.get("primary-keys").split(FieldListaggAgg.DELIMITER)).collect(Collectors.toList());
        }
        List emptyList3 = Collections.emptyList();
        if (fromArgs.has("computed-column")) {
            emptyList3 = new ArrayList(fromArgs.getMultiParameter("computed-column"));
        }
        Optional<Map<String, String>> configMap = Action.getConfigMap(fromArgs, "mysql-conf");
        return !configMap.isPresent() ? Optional.empty() : Optional.of(new MySqlSyncTableAction(configMap.get(), (String) tablePath.f0, (String) tablePath.f1, (String) tablePath.f2, emptyList, emptyList2, emptyList3, Action.getConfigMap(fromArgs, "catalog-conf").orElse(Collections.emptyMap()), Action.getConfigMap(fromArgs, "table-conf").orElse(Collections.emptyMap())));
    }

    private static void printHelp() {
        System.out.println("Action \"mysql-sync-table\" creates a streaming job with a Flink MySQL CDC source and a Paimon table sink to consume CDC events.");
        System.out.println();
        System.out.println("Syntax:");
        System.out.println("  mysql-sync-table --warehouse <warehouse-path> --database <database-name> --table <table-name> [--partition-keys <partition-keys>] [--primary-keys <primary-keys>] [--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
        System.out.println();
        System.out.println("Partition keys syntax:");
        System.out.println("  key1,key2,...");
        System.out.println("If partition key is not defined and the specified Paimon table does not exist, this action will automatically create an unpartitioned Paimon table.");
        System.out.println();
        System.out.println("Primary keys syntax:");
        System.out.println("  key1,key2,...");
        System.out.println("Primary keys will be derived from MySQL tables if not specified.");
        System.out.println();
        System.out.println("Please see doc for usage of --computed-column.");
        System.out.println();
        System.out.println("MySQL CDC source conf syntax:");
        System.out.println("  key=value");
        System.out.println("'hostname', 'username', 'password', 'database-name' and 'table-name' are required configurations, others are optional.");
        System.out.println("For a complete list of supported configurations, see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
        System.out.println();
        System.out.println("Paimon catalog and table sink conf syntax:");
        System.out.println("  key=value");
        System.out.println("For a complete list of supported configurations, see https://paimon.apache.org/docs/master/maintenance/configurations/");
        System.out.println();
        System.out.println("Examples:");
        System.out.println("  mysql-sync-table \\\n    --warehouse hdfs:///path/to/warehouse \\\n    --database test_db \\\n    --table test_table \\\n    --partition-keys pt \\\n    --primary-keys pt,uid \\\n    --mysql-conf hostname=127.0.0.1 \\\n    --mysql-conf username=root \\\n    --mysql-conf password=123456 \\\n    --mysql-conf database-name=source_db \\\n    --mysql-conf table-name='source_table_.*' \\\n    --catalog-conf metastore=hive \\\n    --catalog-conf uri=thrift://hive-metastore:9083 \\\n    --table-conf bucket=4 \\\n    --table-conf changelog-producer=input \\\n    --table-conf sink.parallelism=4");
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        executionEnvironment.execute(String.format("MySQL-Paimon Table Sync: %s.%s", this.database, this.table));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1395025350:
                if (implMethodName.equals("lambda$build$a527abcc$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction") && serializedLambda.getImplMethodSignature().equals("(Ljava/time/ZoneId;ZLjava/util/List;)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    ZoneId zoneId = (ZoneId) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    List list = (List) serializedLambda.getCapturedArg(2);
                    return () -> {
                        return new MySqlDebeziumJsonEventParser(zoneId, booleanValue, (List<ComputedColumn>) list);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
