package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.class */
public class KafkaOggSyncTableActionITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String str) throws Exception {
        createTestTopic("schema_evolution", 1, 1);
        try {
            writeRecordsToKafka("schema_evolution", readLines(String.format("kafka/ogg/table/%s/ogg-data-1.txt", str)));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "schema_evolution");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            testSchemaEvolutionImpl("schema_evolution", str);
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    private void testSchemaEvolutionImpl(String str, String str2) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]"), fileStoreTable, of, singletonList);
        try {
            writeRecordsToKafka(str, readLines(String.format("kafka/ogg/table/%s/ogg-data-2.txt", str2)));
            waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175, NULL]", "+I[102, car battery, 12V car battery, 8.100000381469727, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age"}), singletonList);
            try {
                writeRecordsToKafka(str, readLines(String.format("kafka/ogg/table/%s/ogg-data-3.txt", str2)));
                waitForResult(Arrays.asList("+I[102, car battery, 12V car battery, 8.100000381469727, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.300000190734863, NULL, NULL]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age", "address"}), singletonList);
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testNotSupportFormat() throws Exception {
        createTestTopic("not_support", 1, 1);
        try {
            writeRecordsToKafka("not_support", readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "togg-json");
            basicKafkaConfig.put("topic", "not_support");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            KafkaSyncTableAction kafkaSyncTableAction = new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap);
            Assertions.assertThatThrownBy(() -> {
                kafkaSyncTableAction.build(executionEnvironment);
            }).isInstanceOf(UnsupportedOperationException.class).hasMessage("This format: togg-json is not supported.");
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testAssertSchemaCompatible() throws Exception {
        createTestTopic("assert_schema_compatible", 1, 1);
        try {
            writeRecordsToKafka("assert_schema_compatible", readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "assert_schema_compatible");
            Catalog catalog = catalog();
            catalog.createDatabase(this.database, true);
            catalog.createTable(Identifier.create(this.database, this.tableName), Schema.newBuilder().column("k", DataTypes.STRING()).column("v1", DataTypes.STRING()).primaryKey(new String[]{"k"}).build(), false);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            KafkaSyncTableAction kafkaSyncTableAction = new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap);
            Assertions.assertThatThrownBy(() -> {
                kafkaSyncTableAction.build(executionEnvironment);
            }).isInstanceOf(IllegalArgumentException.class).hasMessage("Paimon schema and Kafka schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nKafka fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]");
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionSpecific() throws Exception {
        createTestTopic("start_up_specific", 1, 1);
        try {
            writeRecordsToKafka("start_up_specific", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "start_up_specific");
            basicKafkaConfig.put("scan.startup.mode", "specific-offsets");
            basicKafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            waitForResult(Collections.singletonList("+I[102, car battery, 12V car battery, 8.100000381469727]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
        } catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionLatest() throws Exception {
        createTestTopic("start_up_latest", 1, 1);
        try {
            writeRecordsToKafka("start_up_latest", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "start_up_latest");
            basicKafkaConfig.put("scan.startup.mode", "latest-offset");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            waitTablesCreated(this.tableName);
            Thread.sleep(5000L);
            FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
            try {
                writeRecordsToKafka("start_up_latest", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionTimestamp() throws Exception {
        createTestTopic("start_up_timestamp", 1, 1);
        try {
            writeRecordsToKafka("start_up_timestamp", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "start_up_timestamp");
            basicKafkaConfig.put("scan.startup.mode", "timestamp");
            basicKafkaConfig.put("scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_timestamp", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionEarliest() throws Exception {
        createTestTopic("start_up_earliest", 1, 1);
        try {
            writeRecordsToKafka("start_up_earliest", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "start_up_earliest");
            basicKafkaConfig.put("scan.startup.mode", "earliest-offset");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_earliest", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionGroup() throws Exception {
        createTestTopic("start_up_group", 1, 1);
        try {
            writeRecordsToKafka("start_up_group", readLines("kafka/ogg/table/startupmode/ogg-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "ogg-json");
            basicKafkaConfig.put("topic", "start_up_group");
            basicKafkaConfig.put("scan.startup.mode", "group-offsets");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_group", readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
                waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
            } catch (Exception e) {
                throw new Exception("Failed to write ogg data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write ogg data to Kafka.", e2);
        }
    }
}
