package org.apache.flink.cdc.connectors.mysql.debezium.converters;

import io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter;
import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.class */
public class MysqlDebeziumTimeConverterITCase {
    private static TemporaryFolder tempFolder;
    private static File resourceFolder;
    private static final Logger LOG = LoggerFactory.getLogger(MysqlDebeziumTimeConverterITCase.class);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);

    @Before
    public void setup() throws Exception {
        resourceFolder = Paths.get(((URL) Objects.requireNonNull(MySqlValidatorTest.class.getClassLoader().getResource("."))).toURI()).toFile();
        tempFolder = new TemporaryFolder(resourceFolder);
        tempFolder.create();
        this.env.setParallelism(1);
    }

    @Test
    public void testReadDateConvertDataStreamInJvmTime() throws Exception {
        testReadDateConvertDataStreamSource(ZoneId.systemDefault().toString());
    }

    @Test
    public void testReadDateConvertDataStreamInAsia() throws Exception {
        testReadDateConvertDataStreamSource("Asia/Shanghai");
    }

    @Test
    public void testReadDateConvertDataStreamInBerlin() throws Exception {
        testReadDateConvertDataStreamSource("Europe/Berlin");
    }

    @Test
    public void testReadDateConvertSQLSourceInAsia() throws Exception {
        testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
    }

    @Test
    public void testReadDateConvertSQLSourceInBerlin() throws Exception {
        testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
    }

    private void testReadDateConvertDataStreamSource(String str) throws Exception {
        MySqlContainer createMySqlContainer = createMySqlContainer(str);
        startContainers(createMySqlContainer, str);
        UniqueDatabase uniqueDatabase = getUniqueDatabase(createMySqlContainer);
        uniqueDatabase.createAndInitialize();
        this.env.enableCheckpointing(1000L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSourceBuilder debeziumProperties = MySqlSource.builder().hostname(uniqueDatabase.getHost()).port(uniqueDatabase.getDatabasePort()).databaseList(new String[]{uniqueDatabase.getDatabaseName()}).tableList(new String[]{uniqueDatabase.getDatabaseName() + ".date_convert_test"}).startupOptions(StartupOptions.initial()).serverTimeZone(str).username(uniqueDatabase.getUsername()).password(uniqueDatabase.getPassword()).debeziumProperties(getDebeziumConfigurations(str));
        debeziumProperties.deserializer(new JsonDebeziumDeserializationSchema());
        validTimestampValue(executionEnvironment.fromSource(debeziumProperties.build(), WatermarkStrategy.noWatermarks(), "testDataStreamSourceConvertData").executeAndCollect(3));
    }

    private void validTimestampValue(List<String> list) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        String[] strArr = {"14:23:00", "00:00:00", "00:00:00", "15:04:00"};
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            JsonNode readTree = objectMapper.readTree(it.next());
            Assert.assertEquals(strArr[readTree.get("after").get("id").asInt() - 1], readTree.get("after").get("test_timestamp").asText());
        }
    }

    private void testTemporalTypesWithMySqlServerTimezone(String str) throws Exception {
        MySqlContainer createMySqlContainer = createMySqlContainer(str);
        startContainers(createMySqlContainer, str);
        UniqueDatabase uniqueDatabase = getUniqueDatabase(createMySqlContainer);
        uniqueDatabase.createAndInitialize();
        this.env.enableCheckpointing(1000L);
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        create.executeSql(String.format("CREATE TABLE customers ( id BIGINT NOT NULL, test_timestamp STRING, test_datetime STRING, test_date STRING, test_time STRING, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = 'date_convert_test', 'scan.startup.mode' = '%s', 'server-time-zone' = '%s', 'debezium.converters' = 'datetime', 'debezium.datetime.type' = '%s', 'debezium.database.connectionTimeZone' = '%s', 'debezium.datetime.format.time' = 'HH:mm:ss', 'debezium.datetime.format.timezone' = '%s', 'debezium.datetime.format.timestamp' = 'HH:mm:ss', 'debezium.datetime.format.default.value.convert' = 'true')", createMySqlContainer.getHost(), Integer.valueOf(createMySqlContainer.getDatabasePort()), uniqueDatabase.getUsername(), uniqueDatabase.getPassword(), uniqueDatabase.getDatabaseName(), "initial", str, MysqlDebeziumTimeConverter.class.getName(), str, str));
        checkData(create.executeSql("select * from customers"));
    }

    private Properties getDebeziumConfigurations(String str) {
        Properties properties = new Properties();
        properties.setProperty("converters", "datetime");
        properties.setProperty("datetime.type", MysqlDebeziumTimeConverter.class.getName());
        properties.setProperty("datetime.format.timestamp", "HH:mm:ss");
        properties.setProperty("datetime.format.default.value.convert", "false");
        properties.setProperty("database.connectionTimeZone", str);
        properties.setProperty("datetime.format.timezone", str);
        LOG.info("Supplied debezium properties: {}", properties);
        return properties;
    }

    private void checkData(TableResult tableResult) {
        ArrayList arrayList = new ArrayList(Arrays.asList("+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]", "+I[3, 00:00:00, null, null, 00:01:20]", "+I[2, 00:00:00, null, null, 00:00:00]", "+I[4, 15:04:00, null, null, 00:01:10]"));
        CloseableIterator collect = tableResult.collect();
        ((JobClient) tableResult.getJobClient().get()).getJobID();
        MySqlSourceTestBase.assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    protected MySqlContainer createMySqlContainer(String str) {
        return new MySqlContainer(MySqlVersion.V5_7).withConfigurationOverride(buildMySqlConfigWithTimezone(str)).withSetupSQL("docker/setup.sql").m22withDatabaseName("flink-test").m24withUsername("flinkuser").m23withPassword("flinkpw").withLogConsumer(new Slf4jLogConsumer(LOG));
    }

    protected void startContainers(MySqlContainer mySqlContainer, String str) {
        LOG.info("Starting containers with timezone {} ...", str);
        Startables.deepStart(Stream.of(mySqlContainer)).join();
        LOG.info("Containers are started.");
        LOG.info("JVM System Clock Zone Id : {}", ZoneId.systemDefault());
    }

    protected UniqueDatabase getUniqueDatabase(MySqlContainer mySqlContainer) {
        return new UniqueDatabase(mySqlContainer, "date_convert_test", "mysqluser", "mysqlpw");
    }

    private String buildMySqlConfigWithTimezone(String str) {
        try {
            Path createFile = Files.createFile(Paths.get(tempFolder.newFolder(String.valueOf(UUID.randomUUID())).getPath(), "my.cnf"), new FileAttribute[0]);
            Files.write(createFile, Collections.singleton("[mysqld]\nbinlog_format = row\nlog_bin = mysql-bin\nserver-id = 223344\nbinlog_row_image = FULL\nsql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n" + ("default-time_zone = '" + str + "'\n")), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
            return Paths.get(resourceFolder.getAbsolutePath(), new String[0]).relativize(createFile).toString();
        } catch (Exception e) {
            throw new RuntimeException("Failed to create my.cnf file.", e);
        }
    }
}
