package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.class */
public class KafkaMaxwellSyncDatabaseActionITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolutionMultiTopic() throws Exception {
        List<String> asList = Arrays.asList("schema_evolution_0", "schema_evolution_1");
        asList.forEach(str -> {
            createTestTopic(str, 1, 1);
        });
        for (int i = 0; i < 2; i++) {
            try {
                writeRecordsToKafka(asList.get(i), readLines("kafka/maxwell/database/schemaevolution/topic" + i + "/maxwell-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join(";", asList));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl(asList, false, 2);
    }

    @Timeout(60)
    @Test
    public void testSchemaEvolutionOneTopic() throws Exception {
        List<String> singletonList = Collections.singletonList("schema_evolution");
        singletonList.forEach(str -> {
            createTestTopic(str, 1, 1);
        });
        for (int i = 0; i < 2; i++) {
            try {
                writeRecordsToKafka(singletonList.get(0), readLines("kafka/maxwell/database/schemaevolution/topic" + i + "/maxwell-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join(";", singletonList));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl(singletonList, true, 2);
    }

    private void testSchemaEvolutionImpl(List<String> list, boolean z, int i) throws Exception {
        String str;
        waitingTables("t1", "t2");
        FileStoreTable fileStoreTable = getFileStoreTable("t1");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t2");
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList2 = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), fileStoreTable2, of2, singletonList2);
        for (int i2 = 0; i2 < i; i2++) {
            if (z) {
                try {
                    str = list.get(0);
                } catch (Exception e) {
                    throw new Exception("Failed to write maxwell data to Kafka.", e);
                }
            } else {
                str = list.get(i2);
            }
            writeRecordsToKafka(str, readLines("kafka/maxwell/database/schemaevolution/topic" + i2 + "/maxwell-data-2.txt"));
        }
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age"}), singletonList);
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, Beijing]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "address"}), singletonList2);
    }

    @Test
    public void testTopicIsEmpty() {
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        KafkaSyncDatabaseAction build = syncDatabaseActionBuilder(basicKafkaConfig).build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, "kafka-conf [topic] must be specified.")});
    }

    @Timeout(60)
    @Test
    public void testTableAffixMultiTopic() throws Exception {
        createFileStoreTable("test_prefix_t1_test_suffix", RowType.of(new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), Collections.singletonList("id"), Collections.emptyMap());
        List<String> asList = Arrays.asList("prefix_suffix_0", "prefix_suffix_1");
        asList.forEach(str -> {
            createTestTopic(str, 1, 1);
        });
        for (int i = 0; i < asList.size(); i++) {
            try {
                writeRecordsToKafka(asList.get(i), readLines("kafka/maxwell/database/prefixsuffix/topic" + i + "/maxwell-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join(";", asList));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build());
        testTableAffixImpl(asList, false, 2);
    }

    @Timeout(60)
    @Test
    public void testTableAffixOneTopic() throws Exception {
        createFileStoreTable("test_prefix_t1_test_suffix", RowType.of(new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), Collections.singletonList("id"), Collections.emptyMap());
        List<String> singletonList = Collections.singletonList("prefix_suffix");
        singletonList.forEach(str -> {
            createTestTopic(str, 1, 1);
        });
        for (int i = 0; i < 2; i++) {
            try {
                writeRecordsToKafka(singletonList.get(0), readLines("kafka/maxwell/database/prefixsuffix/topic" + i + "/maxwell-data-1.txt"));
            } catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join(";", singletonList));
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build());
        testTableAffixImpl(singletonList, true, 2);
    }

    private void testTableAffixImpl(List<String> list, boolean z, int i) throws Exception {
        String str;
        waitingTables("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
        FileStoreTable fileStoreTable = getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable fileStoreTable2 = getFileStoreTable("test_prefix_t2_test_suffix");
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList2 = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), fileStoreTable2, of2, singletonList2);
        for (int i2 = 0; i2 < i; i2++) {
            if (z) {
                try {
                    str = list.get(0);
                } catch (Exception e) {
                    throw new Exception("Failed to write maxwell data to Kafka.", e);
                }
            } else {
                str = list.get(i2);
            }
            writeRecordsToKafka(str, readLines("kafka/maxwell/database/prefixsuffix/topic" + i2 + "/maxwell-data-2.txt"));
        }
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, Beijing]", "+I[102, car battery, 12V car battery, 8.1, Shanghai]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "address"}), singletonList);
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age"}), singletonList2);
    }

    @Timeout(60)
    @Test
    public void testIncludingTables() throws Exception {
        includingAndExcludingTablesImpl("flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignore"));
    }

    @Timeout(60)
    @Test
    public void testExcludingTables() throws Exception {
        includingAndExcludingTablesImpl(null, "flink|paimon.+", Collections.singletonList("ignore"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Timeout(60)
    @Test
    public void testIncludingAndExcludingTables() throws Exception {
        includingAndExcludingTablesImpl("flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "ignore"));
    }

    private void includingAndExcludingTablesImpl(@Nullable String str, @Nullable String str2, List<String> list, List<String> list2) throws Exception {
        List singletonList = Collections.singletonList("include_exclude" + UUID.randomUUID());
        singletonList.forEach(str3 -> {
            createTestTopic(str3, 1, 1);
        });
        try {
            writeRecordsToKafka((String) singletonList.get(0), readLines("kafka/maxwell/database/include/topic0/maxwell-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join(";", singletonList));
            runActionWithDefaultEnv(syncDatabaseActionBuilder(basicKafkaConfig).includingTables(str).excludingTables(str2).withTableConfig(getBasicTableConfig()).build());
            waitingTables(list);
            assertTableNotExists(list2);
        } catch (Exception e) {
            throw new Exception("Failed to write maxwell data to Kafka.", e);
        }
    }
}
