package org.apache.paimon.flink.kafka;

import java.io.IOException;
import java.util.UUID;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/kafka/LogSystemITCase.class */
public class LogSystemITCase extends KafkaTableTestBase {
    @BeforeEach
    public void before() throws IOException {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s')", getTempDirPath()));
        this.tEnv.useCatalog("PAIMON");
    }

    @Test
    public void testAppendOnlyWithEventual() throws Exception {
        createTopicIfNotExists("T", 1);
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'log.consistency'='eventual', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T')", getBootstrapServers()));
        this.tEnv.executeSql("CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO T SELECT * FROM gen");
        BlockingIterator of = BlockingIterator.of(this.tEnv.executeSql("SELECT * FROM T").collect());
        Assertions.assertThat(of.collect(10)).hasSize(10);
        ((JobClient) executeSql.getJobClient().get()).cancel();
        of.close();
    }

    @Test
    public void testReadFromFile() throws Exception {
        createTopicIfNotExists("test-double-sink", 1);
        this.env.getCheckpointConfig().setCheckpointInterval(3000L);
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE kafka_file_double_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n 'merge-engine' = 'aggregation',\n  'changelog-producer' = 'full-compaction',\n    'log.system' = 'kafka',\n    'streaming-read-mode'='file',\n    'fields.cnt.aggregate-function' = 'sum',\n    'kafka.bootstrap.servers' = '%s',\n    'kafka.topic' = 'test-double-sink',\n    'kafka.transaction.timeout.ms'='30000'\n\n);", getBootstrapServers()));
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO kafka_file_double_sink values('a',1),('b',2),('c',3);");
        BlockingIterator of = BlockingIterator.of(this.tEnv.executeSql("SELECT * FROM kafka_file_double_sink").collect());
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"a", 1L}), Row.of(new Object[]{"b", 2L}), Row.of(new Object[]{"c", 3L})});
        ((JobClient) executeSql.getJobClient().get()).cancel();
        of.close();
    }

    @Test
    public void testReadFromLog() throws Exception {
        createTopicIfNotExists("test-single-sink", 1);
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE kafka_file_single_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n 'merge-engine' = 'aggregation',\n    'changelog-producer' = 'full-compaction',\n    'log.consistency' = 'eventual',\n    'log.system' = 'kafka',\n    'streaming-read-mode'='log',\n    'kafka.bootstrap.servers' = '%s',\n    'kafka.topic' = 'test-single-sink',\n    'kafka.transaction.timeout.ms'='30000'\n\n);", getBootstrapServers()));
        this.tEnv.executeSql("CREATE TEMPORARY TABLE word_table (\n    word STRING\n) WITH (\n    'connector' = 'datagen',\n    'fields.word.length' = '1'\n);");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO kafka_file_single_sink SELECT word, COUNT(*) FROM word_table GROUP BY word;");
        BlockingIterator of = BlockingIterator.of(this.tEnv.executeSql("SELECT * FROM kafka_file_single_sink").collect());
        Assertions.assertThat(of.collect(10)).hasSize(10);
        ((JobClient) executeSql.getJobClient().get()).cancel();
        of.close();
    }

    @Test
    public void testReadFromLogWithOutSteamingReadMode() throws Exception {
        createTopicIfNotExists("test-single-sink", 1);
        this.env.setParallelism(1);
        this.tEnv.executeSql("CREATE TABLE kafka_file_single_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n 'merge-engine' = 'aggregation',\n    'changelog-producer' = 'full-compaction',\n    'streaming-read-mode'='log'\n);");
        this.tEnv.executeSql("CREATE TEMPORARY TABLE word_table (\n    word STRING\n) WITH (\n    'connector' = 'datagen',\n    'fields.word.length' = '1'\n);");
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("INSERT INTO kafka_file_single_sink SELECT word, COUNT(*) FROM word_table GROUP BY word;");
        }).getRootCause().isInstanceOf(ValidationException.class).hasMessage("File store continuous reading does not support the log streaming read mode.");
    }

    @Timeout(60)
    @Test
    public void testLogSystemAutoRegister() throws TableNotExistException {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON_REGISTER WITH ('type'='paimon', 'warehouse'='%s', 'log.system.auto-register'='true')", getTempDirPath()));
        this.tEnv.useCatalog("PAIMON_REGISTER");
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='Tt')", getBootstrapServers()));
        checkTopicExists("Tt", 2, 1);
        this.tEnv.executeSql(String.format("CREATE TABLE T2 (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T2')", getBootstrapServers()));
        checkTopicExists("T2", 2, 1);
        this.tEnv.executeSql(String.format("CREATE TABLE T1 (i INT, j INT) WITH ('log.system'='kafka', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s')", getBootstrapServers()));
        checkTopicExists((String) ((Catalog) this.tEnv.getCatalog("PAIMON_REGISTER").get()).getTable(ObjectPath.fromString("default.T")).getOptions().get("kafka.topic"), 2, 1);
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T1')", getBootstrapServers()));
        }).isInstanceOf(ValidationException.class).hasMessage("Could not execute CreateTable in path `PAIMON_REGISTER`.`default`.`T`").cause().isInstanceOf(TableAlreadyExistException.class).hasMessage("Table (or view) default.T already exists in Catalog PAIMON_REGISTER.");
        checkTopicNotExist("T1");
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql(String.format("CREATE TABLE NOT_EXIST.T (i INT, j INT) WITH ('log.system'='kafka', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T1')", getBootstrapServers()));
        }).isInstanceOf(ValidationException.class).hasMessage("Could not execute CreateTable in path `PAIMON_REGISTER`.`NOT_EXIST`.`T`").cause().isInstanceOf(DatabaseNotExistException.class).hasMessage("Database NOT_EXIST does not exist in Catalog PAIMON_REGISTER.");
        checkTopicNotExist("T1");
        this.tEnv.executeSql("DROP TABLE T");
        checkTopicNotExist("T");
    }

    @Timeout(60)
    @Test
    public void testLogSystemAutoRegisterWithDefaultOption() {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON_DEFAULT WITH ('type'='paimon', 'warehouse'='%s', 'log.system.auto-register'='true', 'table-default.kafka.bootstrap.servers'='%s','table-default.log.system.partitions'='2')", getTempDirPath(), getBootstrapServers()));
        this.tEnv.useCatalog("PAIMON_DEFAULT");
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'kafka.topic'='T')", getBootstrapServers()));
        checkTopicExists("T", 2, 1);
    }

    @Test
    public void testLogWriteRead() throws Exception {
        String uuid = UUID.randomUUID().toString();
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING) WITH ('log.system'='kafka', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", getBootstrapServers(), uuid));
            this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
            Assertions.assertThat(BlockingIterator.of(this.tEnv.from("T").execute().collect()).collectAndClose(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
            deleteTopicIfExists(uuid);
        } catch (Throwable th) {
            deleteTopicIfExists(uuid);
            throw th;
        }
    }

    @Test
    public void testLogWriteReadWithVirtual() throws Exception {
        String uuid = UUID.randomUUID().toString();
        createTopicIfNotExists(uuid, 1);
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING, d AS CAST(c as INT) + 1) WITH ('log.system'='kafka', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", getBootstrapServers(), uuid));
            this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
            Assertions.assertThat(BlockingIterator.of(this.tEnv.from("T").execute().collect()).collectAndClose(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3", 4}), Row.of(new Object[]{"4", "5", "6", 7})});
            deleteTopicIfExists(uuid);
        } catch (Throwable th) {
            deleteTopicIfExists(uuid);
            throw th;
        }
    }
}
