package org.apache.flink.cdc.connectors.mysql.source;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.stream.Stream;
import javax.xml.bind.DatatypeConverter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.RecordDataTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.class */
public class MySqlFullTypesITCase extends MySqlSourceTestBase {
    private final UniqueDatabase fullTypesMySql57Database = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", MySqSourceTestUtils.TEST_USER, MySqSourceTestUtils.TEST_PASSWORD);
    private final UniqueDatabase fullTypesMySql8Database = new UniqueDatabase(MYSQL8_CONTAINER, "column_type_test_mysql8", MySqSourceTestUtils.TEST_USER, MySqSourceTestUtils.TEST_PASSWORD);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
    private static final RowType COMMON_TYPES = RowType.of(new DataType[]{DataTypes.DECIMAL(20, 0).notNull(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.VARCHAR(255), DataTypes.CHAR(3), DataTypes.DOUBLE(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL(8, 4), DataTypes.DECIMAL(8, 4), DataTypes.DECIMAL(8, 4), DataTypes.DECIMAL(6, 0), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.BINARY(1), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BINARY(16), DataTypes.BINARY(8), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()});

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @Before
    public void before() {
        this.env.setParallelism(4);
        this.env.enableCheckpointing(200L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Test
    public void testMysql57CommonDataTypes() throws Throwable {
        testCommonDataTypes(this.fullTypesMySql57Database);
    }

    @Test
    public void testMySql8CommonDataTypes() throws Throwable {
        testCommonDataTypes(this.fullTypesMySql8Database);
    }

    @Test
    public void testMysql57TimeDataTypes() throws Throwable {
        testTimeDataTypes(this.fullTypesMySql57Database, RowType.of(new DataType[]{DataTypes.DECIMAL(20, 0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME(0), DataTypes.TIME(3), DataTypes.TIME(6), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP_LTZ(0), DataTypes.TIMESTAMP_LTZ(0)}), new Object[]{DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, 64822123, 64822123, TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), null}, new Object[]{DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, 64822123, null, TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))});
    }

    @Test
    public void testMysql8TimeDataTypes() throws Throwable {
        testTimeDataTypes(this.fullTypesMySql8Database, RowType.of(new DataType[]{DataTypes.DECIMAL(20, 0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME(0), DataTypes.TIME(3), DataTypes.TIME(6), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP_LTZ(0), DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(6), DataTypes.TIMESTAMP_LTZ(0)}), new Object[]{DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, 64822123, 64822123, TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")), null}, new Object[]{DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, 64822123, null, TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))});
    }

    private void testCommonDataTypes(UniqueDatabase uniqueDatabase) throws Exception {
        uniqueDatabase.createAndInitialize();
        CloseableIterator executeAndCollect = this.env.fromSource(getFlinkSourceProvider(new String[]{"common_types"}, uniqueDatabase).getSource(), WatermarkStrategy.noWatermarks(), "Event-Source").executeAndCollect();
        Object[] objArr = {DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), Byte.MAX_VALUE, (short) 255, (short) 255, Short.MAX_VALUE, 65535, 65535, 8388607, 16777215, 16777215, Integer.MAX_VALUE, 4294967295L, 4294967295L, 2147483647L, Long.MAX_VALUE, DecimalData.fromBigDecimal(new BigDecimal("18446744073709551615"), 20, 0), DecimalData.fromBigDecimal(new BigDecimal("18446744073709551615"), 20, 0), BinaryStringData.fromString("Hello World"), BinaryStringData.fromString("abc"), Double.valueOf(123.102d), Float.valueOf(123.102f), Float.valueOf(123.103f), Float.valueOf(123.104f), Double.valueOf(404.4443d), Double.valueOf(404.4444d), Double.valueOf(404.4445d), DecimalData.fromBigDecimal(new BigDecimal("123.4567"), 8, 4), DecimalData.fromBigDecimal(new BigDecimal("123.4568"), 8, 4), DecimalData.fromBigDecimal(new BigDecimal("123.4569"), 8, 4), DecimalData.fromBigDecimal(new BigDecimal("346"), 6, 0), BinaryStringData.fromString("34567892.1"), false, new byte[]{3}, true, true, DatatypeConverter.parseHexBinary("651aed08-390f-4893-b2f1-36923e7b7400".replace("-", "")), new byte[]{4, 4, 4, 4, 4, 4, 4, 4}, BinaryStringData.fromString("text"), new byte[]{16}, new byte[]{16}, new byte[]{16}, new byte[]{16}, 2021, BinaryStringData.fromString("red"), BinaryStringData.fromString("{\"key1\": \"value1\"}"), BinaryStringData.fromString("{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}"), BinaryStringData.fromString("{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}"), BinaryStringData.fromString("{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}")};
        Assertions.assertThat(RecordDataTestUtils.recordFields(((DataChangeEvent) MySqSourceTestUtils.fetchResults(executeAndCollect, 2).get(1)).after(), COMMON_TYPES)).isEqualTo(objArr);
        Connection jdbcConnection = uniqueDatabase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                createStatement.execute("UPDATE common_types SET big_decimal_c = null WHERE id = 1;");
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                objArr[30] = null;
                objArr[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
                Assertions.assertThat(RecordDataTestUtils.recordFields(((DataChangeEvent) MySqSourceTestUtils.fetchResults(executeAndCollect, 1).get(0)).after(), COMMON_TYPES)).isEqualTo(objArr);
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.time.ZonedDateTime] */
    private Instant toInstant(String str) {
        return Timestamp.valueOf(str).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
    }

    private void testTimeDataTypes(UniqueDatabase uniqueDatabase, RowType rowType, Object[] objArr, Object[] objArr2) throws Exception {
        uniqueDatabase.createAndInitialize();
        CloseableIterator executeAndCollect = this.env.fromSource(getFlinkSourceProvider(new String[]{"time_types"}, uniqueDatabase).getSource(), WatermarkStrategy.noWatermarks(), "Event-Source").executeAndCollect();
        Assertions.assertThat(RecordDataTestUtils.recordFields(((DataChangeEvent) MySqSourceTestUtils.fetchResults(executeAndCollect, 2).get(1)).after(), rowType)).isEqualTo(objArr);
        Connection jdbcConnection = uniqueDatabase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                createStatement.execute("UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                Assertions.assertThat(RecordDataTestUtils.recordFields(((DataChangeEvent) MySqSourceTestUtils.fetchResults(executeAndCollect, 1).get(0)).after(), rowType)).isEqualTo(objArr2);
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private FlinkSourceProvider getFlinkSourceProvider(String[] strArr, UniqueDatabase uniqueDatabase) {
        return new MySqlDataSource(new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{uniqueDatabase.getDatabaseName()}).tableList((String[]) Arrays.stream(strArr).map(str -> {
            return uniqueDatabase.getDatabaseName() + "." + str;
        }).toArray(i -> {
            return new String[i];
        })).includeSchemaChanges(false).hostname(uniqueDatabase.getHost()).port(uniqueDatabase.getDatabasePort()).splitSize(10).fetchSize(2).username(uniqueDatabase.getUsername()).password(uniqueDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism()))).getEventSourceProvider();
    }
}
