package org.apache.flink.cdc.connectors.polardbx;

import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.class */
public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
    private static final String DATABASE = "charset_test";
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private final String testName;
    private final String[] snapshotExpected;
    private final String[] binlogExpected;

    public PolardbxCharsetITCase(String str, String[] strArr, String[] strArr2) {
        this.testName = str;
        this.snapshotExpected = strArr;
        this.binlogExpected = strArr2;
    }

    @Parameterized.Parameters(name = "Test column charset: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{"utf8_test", new String[]{"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}, new String[]{"-D[1, 测试数据]", "-D[2, Craig Marshall]", "-D[3, 另一个测试数据]", "+I[11, 测试数据]", "+I[12, Craig Marshall]", "+I[13, 另一个测试数据]"}}, new Object[]{"ascii_test", new String[]{"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"}, new String[]{"-D[1, ascii test!?]", "-D[2, Craig Marshall]", "-D[3, {test}]", "+I[11, ascii test!?]", "+I[12, Craig Marshall]", "+I[13, {test}]"}}, new Object[]{"gbk_test", new String[]{"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}, new String[]{"-D[1, 测试数据]", "-D[2, Craig Marshall]", "-D[3, 另一个测试数据]", "+I[11, 测试数据]", "+I[12, Craig Marshall]", "+I[13, 另一个测试数据]"}}, new Object[]{"latin1_test", new String[]{"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"}, new String[]{"-D[1, ÀÆÉ]", "-D[2, Craig Marshall]", "-D[3, Üæû]", "+I[11, ÀÆÉ]", "+I[12, Craig Marshall]", "+I[13, Üæû]"}}, new Object[]{"big5_test", new String[]{"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"}, new String[]{"-D[1, 大五]", "-D[2, Craig Marshall]", "-D[3, 丹店]", "+I[11, 大五]", "+I[12, Craig Marshall]", "+I[13, 丹店]"}}};
    }

    @BeforeClass
    public static void beforeClass() throws InterruptedException {
        initializePolardbxTables(DATABASE, str -> {
            return Boolean.valueOf(!StringUtils.isNullOrWhitespaceOnly(str) && (str.contains("utf8_test") || str.contains("latin1_test") || str.contains("gbk_test") || str.contains("big5_test") || str.contains("ascii_test")));
        });
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(4);
        this.env.enableCheckpointing(200L);
    }

    @Test
    public void testCharset() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (\n  table_id BIGINT,\n  table_name STRING,\n  primary key(table_id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", this.testName, "127.0.0.1", PORT, "polardbx_root", "123456", DATABASE, this.testName, true, getServerId(), 4));
        TableResult executeSql = this.tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", this.testName));
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted(collect);
        assertEqualsInAnyOrder(Arrays.asList(this.snapshotExpected), fetchRows(collect, this.snapshotExpected.length));
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute(String.format("/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;", DATABASE, this.testName));
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    assertEqualsInAnyOrder(Arrays.asList(this.binlogExpected), fetchRows(collect, this.binlogExpected.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> closeableIterator) throws Exception {
        while (!closeableIterator.hasNext()) {
            Thread.sleep(100L);
        }
    }
}
