package org.apache.paimon.flink.action.cdc.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.class */
public class KafkaSchemaITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testKafkaSchema() throws Exception {
        createTestTopic("test_kafka_schema", 1, 1);
        writeRecordsToKafka("test_kafka_schema", "kafka/canal/table/schemaevolution/canal-data-1.txt", new Object[0]);
        Configuration fromMap = Configuration.fromMap(getBasicKafkaConfig());
        fromMap.setString(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        fromMap.setString(KafkaConnectorOptions.TOPIC.key(), "test_kafka_schema");
        Schema schema = MessageQueueSchemaUtils.getSchema(KafkaActionUtils.getKafkaEarliestConsumer(fromMap), KafkaActionUtils.getDataFormat(fromMap), TypeMapping.defaultMapping());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataField(0, "pt", DataTypes.INT()));
        arrayList.add(new DataField(1, "_id", DataTypes.INT().notNull()));
        arrayList.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
        Assertions.assertThat(schema.fields()).isEqualTo(arrayList);
    }

    @Timeout(60)
    @Test
    public void testTableOptionsChange() throws Exception {
        createTestTopic("test_table_options_change", 1, 1);
        writeRecordsToKafka("test_table_options_change", "kafka/canal/table/optionschange/canal-data-1.txt", new Object[0]);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "test_table_options_change");
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        JobClient runActionWithDefaultEnv = runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withTableConfig(hashMap).build());
        waitingTables(this.tableName);
        runActionWithDefaultEnv.cancel();
        writeRecordsToKafka("test_table_options_change", "kafka/canal/table/optionschange/canal-data-2.txt", new Object[0]);
        hashMap.put("sink.savepoint.auto-tag", "true");
        hashMap.put("tag.num-retained-max", "5");
        hashMap.put("tag.automatic-creation", "process-time");
        hashMap.put("tag.creation-period", "hourly");
        hashMap.put("tag.creation-delay", "600000");
        hashMap.put("snapshot.time-retained", "1h");
        hashMap.put("snapshot.num-retained.min", "5");
        hashMap.put("snapshot.num-retained.max", "10");
        hashMap.put("changelog-producer", "input");
        KafkaSyncTableAction build = syncTableActionBuilder(basicKafkaConfig).withTableConfig(hashMap).build();
        runActionWithDefaultEnv(build);
        Assertions.assertThat(build.fileStoreTable().options()).containsAllEntriesOf(hashMap);
        Assertions.assertThat(getFileStoreTable(this.tableName).options()).containsAllEntriesOf(hashMap);
    }

    @Timeout(60)
    @Test
    public void testNewlyAddedTablesOptionsChange() throws Exception {
        createTestTopic("test_database_options_change", 1, 1);
        writeRecordsToKafka("test_database_options_change", "kafka/canal/database/schemaevolution/topic0/canal-data-1.txt", new Object[0]);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "test_database_options_change");
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        JobClient runActionWithDefaultEnv = runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTableConfig(hashMap).build());
        waitingTables("t1");
        runActionWithDefaultEnv.cancel();
        hashMap.put("sink.savepoint.auto-tag", "true");
        hashMap.put("tag.num-retained-max", "5");
        hashMap.put("tag.automatic-creation", "process-time");
        hashMap.put("tag.creation-period", "hourly");
        hashMap.put("tag.creation-delay", "600000");
        hashMap.put("snapshot.time-retained", "1h");
        hashMap.put("snapshot.num-retained.min", "5");
        hashMap.put("snapshot.num-retained.max", "10");
        hashMap.put("changelog-producer", "input");
        writeRecordsToKafka("test_database_options_change", "kafka/canal/database/schemaevolution/topic1/canal-data-1.txt", new Object[0]);
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTableConfig(hashMap).build());
        waitingTables("t2");
        Assertions.assertThat(getFileStoreTable("t2").options()).containsAllEntriesOf(hashMap);
    }
}
