package org.apache.flink.connector.jdbc.databases.sqlserver.table;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.connector.jdbc.databases.sqlserver.SqlServerTestBase;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/databases/sqlserver/table/SqlServerTableSourceITCase.class */
class SqlServerTableSourceITCase extends AbstractTestBase implements SqlServerTestBase {
    private static final TableRow INPUT_TABLE = TableBuilder.tableRow("sql_test_table", TableBuilder.field("id", TableBuilder.dbType("INT"), DataTypes.INT().notNull()), TableBuilder.field("tiny_int", TableBuilder.dbType("TINYINT"), DataTypes.TINYINT()), TableBuilder.field("small_int", TableBuilder.dbType("SMALLINT"), DataTypes.SMALLINT()), TableBuilder.field("big_int", TableBuilder.dbType("BIGINT"), DataTypes.BIGINT().notNull()), TableBuilder.field("float_col", TableBuilder.dbType("REAL"), DataTypes.FLOAT()), TableBuilder.field("double_col", TableBuilder.dbType("FLOAT"), DataTypes.DOUBLE()), TableBuilder.field("decimal_col", TableBuilder.dbType("DECIMAL(10, 4)"), DataTypes.DECIMAL(10, 4)), TableBuilder.field("bool", TableBuilder.dbType("BIT"), DataTypes.BOOLEAN()), TableBuilder.field("date_col", TableBuilder.dbType("DATE"), DataTypes.DATE()), TableBuilder.field("time_col", TableBuilder.dbType("TIME(5)"), DataTypes.TIME(0)), TableBuilder.field("datetime_col", TableBuilder.dbType("DATETIME"), DataTypes.TIMESTAMP()), TableBuilder.field("datetime2_col", TableBuilder.dbType("DATETIME2"), DataTypes.TIMESTAMP_WITH_TIME_ZONE()), TableBuilder.field("char_col", TableBuilder.dbType("CHAR"), DataTypes.STRING()), TableBuilder.field("nchar_col", TableBuilder.dbType("NCHAR(3)"), DataTypes.STRING()), TableBuilder.field("varchar2_col", TableBuilder.dbType("VARCHAR(30)"), DataTypes.STRING()), TableBuilder.field("nvarchar2_col", TableBuilder.dbType("NVARCHAR(30)"), DataTypes.STRING()), TableBuilder.field("text_col", TableBuilder.dbType("TEXT"), DataTypes.STRING()), TableBuilder.field("ntext_col", TableBuilder.dbType("NTEXT"), DataTypes.STRING()), TableBuilder.field("binary_col", TableBuilder.dbType("BINARY(10)"), DataTypes.BYTES()));
    private static final String INPUT_TABLE_NAME = INPUT_TABLE.getTableName();
    private static StreamExecutionEnvironment env;
    private static TableEnvironment tEnv;

    SqlServerTableSourceITCase() {
    }

    @Override // org.apache.flink.connector.jdbc.testutils.DatabaseTest
    public List<TableManaged> getManagedTables() {
        return Collections.singletonList(INPUT_TABLE);
    }

    @BeforeEach
    void before() throws SQLException {
        Connection connection = getMetadata().getConnection();
        Throwable th = null;
        try {
            INPUT_TABLE.insertIntoTableValues(connection, "1, 2, 4, 10000000000, 1.12345, 2.12345678791, 100.1234, 0, '1997-01-01', '05:20:20.222','2020-01-01 15:35:00.123','2020-01-01 15:35:00.1234567', 'a', 'abc', 'abcdef', 'xyz','Hello World', 'World Hello', 1024");
            INPUT_TABLE.insertIntoTableValues(connection, "2, 2, 4, 10000000000, 1.12345, 2.12345678791, 101.1234, 1, '1997-01-02', '05:20:20.222','2020-01-01 15:36:01.123','2020-01-01 15:36:01.1234567', 'a', 'abc', 'abcdef', 'xyz','Hey Leonard', 'World Hello', 1024");
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connection.close();
                }
            }
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            tEnv = StreamTableEnvironment.create(env);
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testJdbcSource() {
        createFlinkTable();
        Assertions.assertThat((List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT * FROM " + INPUT_TABLE_NAME).collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList())).isEqualTo((List) Stream.of((Object[]) new String[]{"+I[1, 2, 4, 10000000000, 1.12345, 2.12345678791, 100.1234, false, 1997-01-01, 05:20:20, 2020-01-01T15:35:00.123, 2020-01-01T15:35:00.123456700, a, abc, abcdef, xyz, Hello World, World Hello, [0, 0, 0, 0, 0, 0, 0, 0, 4, 0]]", "+I[2, 2, 4, 10000000000, 1.12345, 2.12345678791, 101.1234, true, 1997-01-02, 05:20:20, 2020-01-01T15:36:01.123, 2020-01-01T15:36:01.123456700, a, abc, abcdef, xyz, Hey Leonard, World Hello, [0, 0, 0, 0, 0, 0, 0, 0, 4, 0]]"}).sorted().collect(Collectors.toList()));
    }

    @Test
    void testProject() {
        createFlinkTable();
        Assertions.assertThat((List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE_NAME).collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList())).isEqualTo((List) Stream.of((Object[]) new String[]{"+I[1, 2020-01-01T15:35:00.123, 100.1234]", "+I[2, 2020-01-01T15:36:01.123, 101.1234]"}).sorted().collect(Collectors.toList()));
    }

    @Test
    void testFilter() {
        createFlinkTable();
        Assertions.assertThat((List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE_NAME + " WHERE id = 1").collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList())).isEqualTo((List) Stream.of("+I[1, 2020-01-01T15:35:00.123, 100.1234]").collect(Collectors.toList()));
    }

    private void createFlinkTable() {
        tEnv.executeSql("CREATE TABLE " + INPUT_TABLE_NAME + " (id INT NOT NULL,tiny_int TINYINT,small_int SMALLINT,big_int BIGINT,float_col FLOAT,double_col DOUBLE ,decimal_col DECIMAL(10, 4) NOT NULL,bool BOOLEAN NOT NULL,date_col DATE NOT NULL,time_col TIME(0) NOT NULL,datetime_col TIMESTAMP,datetime2_col TIMESTAMP WITHOUT TIME ZONE,char_col STRING NOT NULL,nchar_col STRING NOT NULL,varchar2_col STRING NOT NULL,nvarchar2_col STRING NOT NULL,text_col STRING,ntext_col STRING,binary_col BYTES) WITH (  'connector'='jdbc',  'url'='" + getMetadata().getJdbcUrl() + "',  'table-name'='" + INPUT_TABLE_NAME + "',  'username'='" + getMetadata().getUsername() + "',  'password'='" + getMetadata().getPassword() + "')");
    }
}
