package org.apache.paimon.flink.action.cdc.pulsar;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.class */
public class PulsarSyncDatabaseActionITCase extends PulsarActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolutionMultiTopic() throws Exception {
        this.topics = Arrays.asList("schema_evolution_0", "schema_evolution_1", "schema_evolution_2");
        this.topics.forEach(str -> {
            createTopic(str, 1);
        });
        for (int i = 0; i < 3; i++) {
            try {
                sendMessages(this.topics.get(i), getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        Map<String, String> basicPulsarConfig = getBasicPulsarConfig();
        basicPulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        basicPulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        basicPulsarConfig.put(PulsarActionUtils.TOPIC.key(), String.join(",", this.topics));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicPulsarConfig).withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl(this.topics, false, 3);
    }

    @Timeout(60)
    @Test
    public void testSchemaEvolutionOneTopic() throws Exception {
        this.topics = Collections.singletonList("schema_evolution");
        this.topics.forEach(str -> {
            createTopic(str, 1);
        });
        for (int i = 0; i < 3; i++) {
            try {
                sendMessages(this.topics.get(0), getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        Map<String, String> basicPulsarConfig = getBasicPulsarConfig();
        basicPulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        basicPulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        basicPulsarConfig.put(PulsarActionUtils.TOPIC.key(), String.join(";", this.topics));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicPulsarConfig).withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl(this.topics, true, 3);
    }

    private void testSchemaEvolutionImpl(List<String> list, boolean z, int i) throws Exception {
        String str;
        String str2;
        waitingTables("t1", "t2");
        FileStoreTable fileStoreTable = getFileStoreTable("t1");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t2");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
        List<String> singletonList = Collections.singletonList("k");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"});
        List<String> asList = Arrays.asList("k1", "k2");
        waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable2, of2, asList);
        for (int i2 = 0; i2 < i; i2++) {
            if (z) {
                try {
                    str2 = list.get(0);
                } catch (Exception e) {
                    throw new Exception("Failed to write canal data to Pulsar.", e);
                }
            } else {
                str2 = list.get(i2);
            }
            sendMessages(str2, getMessages("kafka/canal/database/schemaevolution/topic" + i2 + "/canal-data-2.txt"));
        }
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(10)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
        for (int i3 = 0; i3 < i; i3++) {
            if (z) {
                try {
                    str = list.get(0);
                } catch (Exception e2) {
                    throw new Exception("Failed to write canal data to Pulsar.", e2);
                }
            } else {
                str = list.get(i3);
            }
            sendMessages(str, getMessages("kafka/canal/database/schemaevolution/topic" + i3 + "/canal-data-3.txt"));
        }
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(20)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
    }
}
