package org.apache.flink.table.store.connector;

import java.io.IOException;
import java.util.UUID;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/FileSystemCatalogITCase.class */
public class FileSystemCatalogITCase extends KafkaTableTestBase {
    @Before
    public void before() throws IOException {
        this.tEnv.executeSql(String.format("CREATE CATALOG fs WITH ('type'='table-store', 'warehouse'='%s')", TEMPORARY_FOLDER.newFolder().toURI().toString()));
        this.tEnv.useCatalog("fs");
        this.env.setParallelism(1);
    }

    @Test
    public void testWriteRead() throws Exception {
        this.tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
        innerTestWriteRead();
    }

    @Test
    public void testLogWriteRead() throws Exception {
        String uuid = UUID.randomUUID().toString();
        createTopicIfNotExists(uuid, 1);
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING) WITH ('log.system'='kafka', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", getBootstrapServers(), uuid));
            innerTestWriteRead();
        } finally {
            deleteTopicIfExists(uuid);
        }
    }

    @Test
    public void testLogWriteReadWithVirtual() throws Exception {
        String uuid = UUID.randomUUID().toString();
        createTopicIfNotExists(uuid, 1);
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING, d AS CAST(c as INT) + 1) WITH ('log.system'='kafka', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", getBootstrapServers(), uuid));
            BlockingIterator of = BlockingIterator.of(this.tEnv.from("T").execute().collect());
            this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
            Assertions.assertThat(of.collectAndClose(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3", 4}), Row.of(new Object[]{"4", "5", "6", 7})});
            deleteTopicIfExists(uuid);
        } catch (Throwable th) {
            deleteTopicIfExists(uuid);
            throw th;
        }
    }

    private void innerTestWriteRead() throws Exception {
        BlockingIterator of = BlockingIterator.of(this.tEnv.from("T").execute().collect());
        this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
        Assertions.assertThat(of.collectAndClose(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
    }
}
