package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.class */
public class KafkaOggSyncTableActionITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String str) throws Exception {
        createTestTopic("schema_evolution", 1, 1);
        try {
            writeRecordsToKafka("schema_evolution", readLines(String.format("kafka/ogg/table/%s/ogg-data-1.txt", str)));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            testSchemaEvolutionImpl("schema_evolution", str);
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    private void testSchemaEvolutionImpl(String str, String str2) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]"), fileStoreTable, of, singletonList);
        try {
            writeRecordsToKafka(str, readLines(String.format("kafka/ogg/table/%s/ogg-data-2.txt", str2)));
            waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175, NULL]", "+I[102, car battery, 12V car battery, 8.100000381469727, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age"}), singletonList);
            try {
                writeRecordsToKafka(str, readLines(String.format("kafka/ogg/table/%s/ogg-data-3.txt", str2)));
                waitForResult(Arrays.asList("+I[102, car battery, 12V car battery, 8.100000381469727, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.300000190734863, NULL, NULL]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age", "address"}), singletonList);
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testNotSupportFormat() throws Exception {
        createTestTopic("not_support", 1, 1);
        try {
            writeRecordsToKafka("not_support", readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "togg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "not_support");
            KafkaSyncTableAction build = syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build();
            build.getClass();
            Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(UnsupportedOperationException.class, "This format: togg-json is not supported.")});
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testAssertSchemaCompatible() throws Exception {
        createTestTopic("assert_schema_compatible", 1, 1);
        try {
            writeRecordsToKafka("assert_schema_compatible", readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "assert_schema_compatible");
            createFileStoreTable(RowType.of(new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
            KafkaSyncTableAction build = syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build();
            build.getClass();
            Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, "Paimon schema and source table schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nSource table fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]")});
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionSpecific() throws Exception {
        createTestTopic("start_up_specific", 1, 1);
        try {
            writeRecordsToKafka("start_up_specific", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_specific");
            basicKafkaConfig.put("scan.startup.mode", "specific-offsets");
            basicKafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            waitForResult(Collections.singletonList("+I[102, car battery, 12V car battery, 8.100000381469727]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionLatest() throws Exception {
        createTestTopic("start_up_latest", 1, 1);
        try {
            writeRecordsToKafka("start_up_latest", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_latest");
            basicKafkaConfig.put("scan.startup.mode", "latest-offset");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            Thread.sleep(5000L);
            FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
            try {
                writeRecordsToKafka("start_up_latest", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionTimestamp() throws Exception {
        createTestTopic("start_up_timestamp", 1, 1);
        try {
            writeRecordsToKafka("start_up_timestamp", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_timestamp");
            basicKafkaConfig.put("scan.startup.mode", "timestamp");
            basicKafkaConfig.put("scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            try {
                writeRecordsToKafka("start_up_timestamp", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionEarliest() throws Exception {
        createTestTopic("start_up_earliest", 1, 1);
        try {
            writeRecordsToKafka("start_up_earliest", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_earliest");
            basicKafkaConfig.put("scan.startup.mode", "earliest-offset");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            try {
                writeRecordsToKafka("start_up_earliest", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionGroup() throws Exception {
        createTestTopic("start_up_group", 1, 1);
        try {
            writeRecordsToKafka("start_up_group", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_group");
            basicKafkaConfig.put("scan.startup.mode", "group-offsets");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            try {
                writeRecordsToKafka("start_up_group", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testComputedColumn() throws Exception {
        createTestTopic("computed_column", 1, 1);
        try {
            writeRecordsToKafka("computed_column", readLines("kafka/ogg/table/computedcolumn/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "computed_column");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPartitionKeys("_year").withPrimaryKeys2("_id", "_year").withComputedColumnArgs("_year=year(_date)").withTableConfig(getBasicTableConfig()).build());
            waitForResult(Collections.singletonList("+I[101, 2023-03-23, 2023]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT().notNull()}, new String[]{"_id", "_date", "_year"}), Arrays.asList("_id", "_year"));
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testCDCOperations() throws Exception {
        createTestTopic("event", 1, 1);
        try {
            writeRecordsToKafka("event", readLines("kafka/ogg/table/event/event-insert.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
            basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "event");
            runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys2("id").withTableConfig(getBasicTableConfig()).build());
            FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
            List<String> singletonList = Collections.singletonList("id");
            RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
            waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, scooter, Big 2-wheel scooter , 5.179999828338623]"), fileStoreTable, of, singletonList);
            try {
                writeRecordsToKafka("event", readLines("kafka/ogg/table/event/event-update.txt"));
                waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, scooter, Big 2-wheel scooter , 8.170000076293945]"), fileStoreTable, of, singletonList);
                try {
                    writeRecordsToKafka("event", readLines("kafka/ogg/table/event/event-delete.txt"));
                    waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]"), fileStoreTable, of, singletonList);
                } catch (Exception e) {
                    throw new Exception("Failed to write canal data to Kafka.", e);
                }
            } catch (Exception e2) {
                throw new Exception("Failed to write canal data to Kafka.", e2);
            }
        } catch (Exception e3) {
            throw new Exception("Failed to write canal data to Kafka.", e3);
        }
    }
}
