package org.apache.paimon.flink.action.cdc.kafka;

import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.class */
public class KafkaSchemaITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testKafkaSchema() throws Exception {
        createTestTopic("test_kafka_schema", 1, 1);
        try {
            writeRecordsToKafka("test_kafka_schema", readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "test_kafka_schema");
            KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(Configuration.fromMap(basicKafkaConfig), "test_kafka_schema");
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("pt", DataTypes.INT());
            linkedHashMap.put("_id", DataTypes.INT());
            linkedHashMap.put("v1", DataTypes.VARCHAR(10));
            Assertions.assertThat(kafkaSchema.fields()).isEqualTo(linkedHashMap);
            Assertions.assertThat(kafkaSchema.tableName()).isEqualTo("schema_evolution_1");
            Assertions.assertThat(kafkaSchema.databaseName()).isEqualTo("paimon_sync_table");
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }
}
