package org.apache.flink.cdc.connectors.vitess.table;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.vitess.VitessTestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.class */
public class VitessConnectorITCase extends VitessTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(1);
    }

    @Test
    public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException {
        initializeTable("inventory");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( `id` INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'vitess-cdc', 'tablet-type' = 'MASTER', 'hostname' = '%s', 'port' = '%s', 'keyspace' = '%s', 'table-name' = '%s')", VITESS_CONTAINER.getHost(), VITESS_CONTAINER.getGrpcPort(), VITESS_CONTAINER.getKeyspace(), "test.products"));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        Thread.sleep(10000L);
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("INSERT INTO test.products \nVALUES (default,'scooter','Small 2-wheel scooter',3.14),\n       (default,'car battery','12V car battery',8.1),\n       (default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),\n       (default,'hammer','12oz carpenters hammer',0.75),\n       (default,'hammer','14oz carpenters hammer',0.875),\n       (default,'hammer','16oz carpenters hammer',1.0),\n       (default,'rocks','box of assorted rocks',5.3),\n       (default,'jacket','water resistent black wind breaker',0.1),\n       (default,'spare tire','24 inch spare tire',22.2);");
                    createStatement.execute("UPDATE test.products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE test.products SET weight='5.1' WHERE id=107;");
                    createStatement.execute("INSERT INTO test.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO test.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE test.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE test.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM test.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 20);
                    assertEqualsInAnyOrder(Arrays.asList("+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"), TestValuesTableFactory.getResults("sink"));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testAllTypes() throws Throwable {
        initializeTable("column_type_test");
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (\n    `id` INT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    int_c INT ,\n    int_un_c BIGINT,\n    int11_c BIGINT,\n    big_c BIGINT,\n    varchar_c STRING,\n    char_c STRING,\n    float_c FLOAT,\n    double_c DOUBLE,\n    decimal_c DECIMAL(8, 4),\n    numeric_c DECIMAL(6, 0),\n    boolean_c BOOLEAN,\n    primary key (`id`) not enforced) WITH ( 'connector' = 'vitess-cdc', 'tablet-type' = 'MASTER', 'hostname' = '%s', 'port' = '%s', 'keyspace' = '%s', 'table-name' = '%s')", VITESS_CONTAINER.getHost(), VITESS_CONTAINER.getGrpcPort(), VITESS_CONTAINER.getKeyspace(), "test.full_types"));
        TableResult executeSql = this.tEnv.executeSql("SELECT * FROM full_types");
        Thread.sleep(10000L);
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("INSERT INTO test.full_types VALUES (\n    DEFAULT, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807,\n    'Hello World', 'abc', 123.102, 404.4443, 123.4567, 345.6, true);");
                    createStatement.execute("UPDATE test.full_types SET varchar_c = 'Bye World' WHERE id=1;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSnapshotStarted(executeSql.collect());
                    List asList = Arrays.asList("+I[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]", "-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]", "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]");
                    Assert.assertEquals(asList, fetchRows(executeSql.collect(), asList.size()));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    public static void assertEqualsInAnyOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        Assert.assertEquals(list.stream().sorted().collect(Collectors.toList()), list2.stream().sorted().collect(Collectors.toList()));
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> closeableIterator) throws Exception {
        while (!closeableIterator.hasNext()) {
            Thread.sleep(100L);
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
