package org.apache.paimon.flink.action.cdc.pulsar;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.class */
public class PulsarSyncTableActionITCase extends PulsarActionITCaseBase {
    @Timeout(120)
    @Test
    public void testSchemaEvolution() throws Exception {
        runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String str) throws Exception {
        this.topics = Collections.singletonList("schema_evolution");
        createTopic("schema_evolution", 1);
        sendMessages("schema_evolution", getMessages(String.format("kafka/canal/table/%s/canal-data-1.txt", str)));
        Map<String, String> basicPulsarConfig = getBasicPulsarConfig();
        basicPulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        basicPulsarConfig.put(PulsarActionUtils.TOPIC.key(), "schema_evolution");
        basicPulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        runActionWithDefaultEnv(syncTableActionBuilder(basicPulsarConfig).withPartitionKeys("pt").withPrimaryKeys2("pt", "_id").withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl("schema_evolution", str);
    }

    private void testSchemaEvolutionImpl(String str, String str2) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"});
        List<String> asList = Arrays.asList("pt", "_id");
        waitForResult(Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"), fileStoreTable, of, asList);
        sendMessages(str, getMessages(String.format("kafka/canal/table/%s/canal-data-2.txt", str2)));
        waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
        sendMessages(str, getMessages(String.format("kafka/canal/table/%s/canal-data-3.txt", str2)));
        waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
        sendMessages(str, getMessages(String.format("kafka/canal/table/%s/canal-data-4.txt", str2)));
        waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(10), DataTypes.FLOAT()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
        sendMessages(str, getMessages(String.format("kafka/canal/table/%s/canal-data-5.txt", str2)));
        waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(20), DataTypes.DOUBLE()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
    }
}
