package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/CatalogTableITCase.class */
public class CatalogTableITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected boolean inferScanParallelism() {
        return true;
    }

    @Test
    public void testNotExistMetadataTable() {
        Assertions.assertThatThrownBy(() -> {
            sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots", new Object[0]);
        }).hasMessageContaining("Object 'T$snapshots' not found");
    }

    @Test
    public void testSnapshotsTable() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1L, 0L, "APPEND"}), Row.of(new Object[]{2L, 0L, "APPEND"})});
    }

    @Test
    public void testSnapshotsTableWithRecordCount() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT snapshot_id, total_record_count, delta_record_count, changelog_record_count FROM T$snapshots", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1L, 1L, 1L, 0L}), Row.of(new Object[]{2L, 2L, 1L, 0L})});
    }

    @Test
    public void testOptionsTable() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM T$options", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{"snapshot.time-retained", "5 h"})});
    }

    @Test
    public void testAllTableOptions() {
        sql("CREATE TABLE T (a INT, b INT) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')", new Object[0]);
        sql("ALTER TABLE T SET ('c.cc.ccc' = 'val3')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM sys.all_table_options", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{FlinkTestBase.CURRENT_DATABASE, "T", "a.aa.aaa", "val1"}), Row.of(new Object[]{FlinkTestBase.CURRENT_DATABASE, "T", "b.bb.bbb", "val2"}), Row.of(new Object[]{FlinkTestBase.CURRENT_DATABASE, "T", "c.cc.ccc", "val3"})});
    }

    @Test
    public void testDropSystemDatabase() {
        Assertions.assertThatCode(() -> {
            sql("DROP DATABASE sys", new Object[0]);
        }).hasRootCauseMessage("Can't do operation on system database.");
    }

    @Test
    public void testCreateSystemDatabase() {
        Assertions.assertThatCode(() -> {
            sql("CREATE DATABASE sys", new Object[0]);
        }).hasRootCauseMessage("Can't do operation on system database.");
    }

    @Test
    public void testChangeTableInSystemDatabase() {
        sql("USE sys", new Object[0]);
        Assertions.assertThatCode(() -> {
            sql("ALTER TABLE all_table_options SET ('bucket-num' = '5')", new Object[0]);
        }).hasRootCauseMessage("Can't alter system table.");
    }

    @Test
    public void testSystemDatabase() {
        sql("USE sys", new Object[0]);
        Assertions.assertThat(sql("SHOW TABLES", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{"all_table_options"})});
    }

    @Test
    public void testCreateSystemTable() {
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE T$snapshots (a INT, b INT)", new Object[0]);
        }).hasRootCauseMessage("Cannot 'createTable' for system table 'Identifier{database='default', table='T$snapshots'}', please use data table.");
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE T$aa$bb (a INT, b INT)", new Object[0]);
        }).hasRootCauseMessage("Cannot 'createTable' for system table 'Identifier{database='default', table='T$aa$bb'}', please use data table.");
    }

    @Test
    public void testManifestsTable() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("SELECT schema_id, file_name, file_size FROM T$manifests", new Object[0]).forEach(row -> {
            Assertions.assertThat(((Long) row.getField(0)).longValue()).isEqualTo(0L);
            Assertions.assertThat(StringUtils.startsWith((String) row.getField(1), "manifest")).isTrue();
            Assertions.assertThat(((Long) row.getField(2)).longValue()).isGreaterThan(0L);
        });
    }

    @Test
    public void testManifestsTableWithFileCount() {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT num_added_files, num_deleted_files FROM T$manifests", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1L, 0L}), Row.of(new Object[]{1L, 0L})});
    }

    @Test
    public void testSchemasTable() throws Exception {
        sql("CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')", new Object[0]);
        sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        Assertions.assertThat(sql("SHOW CREATE TABLE T$schemas", new Object[0]).toString()).isEqualTo("[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n  `schema_id` BIGINT NOT NULL,\n  `fields` VARCHAR(2147483647) NOT NULL,\n  `partition_keys` VARCHAR(2147483647) NOT NULL,\n  `primary_keys` VARCHAR(2147483647) NOT NULL,\n  `options` VARCHAR(2147483647) NOT NULL,\n  `comment` VARCHAR(2147483647),\n  `update_time` TIMESTAMP(3) NOT NULL\n) ]]");
        Assertions.assertThat(sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas order by schema_id", new Object[0]).toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], +I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ]]");
    }

    @Test
    public void testSnapshotsSchemasTable() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        sql("INSERT INTO T VALUES (5, 6)", new Object[0]);
        sql("INSERT INTO T VALUES (7, 8)", new Object[0]);
        Assertions.assertThat((List) sql("SELECT s.snapshot_id, s.schema_id, t.fields FROM T$snapshots s JOIN T$schemas t ON s.schema_id=t.schema_id", new Object[0]).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"+I[1, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[2, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[3, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[4, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]"});
    }

    @Test
    public void testCreateTableLike() throws Exception {
        sql("CREATE TABLE T (a INT)", new Object[0]);
        sql("CREATE TABLE T1 LIKE T", new Object[0]);
        Assertions.assertThat(sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T1$schemas s", new Object[0]).toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
    }

    @Test
    public void testCreateTableAs() throws Exception {
        sql("CREATE TABLE t (a INT)", new Object[0]);
        sql("INSERT INTO t VALUES(1),(2)", new Object[0]);
        sql("CREATE TABLE t1 AS SELECT * FROM t", new Object[0]);
        Assertions.assertThat(sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM t1$schemas s", new Object[0]).toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
        Assertions.assertThat(sql("SELECT * FROM t1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        sql("CREATE TABLE t_p (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh)", new Object[0]);
        sql("INSERT INTO t_p SELECT 1,2,'a','2023-02-19','12'", new Object[0]);
        sql("CREATE TABLE t1_p WITH ('partition' = 'dt' ) AS SELECT * FROM t_p", new Object[0]);
        Assertions.assertThat(sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM t1_p$schemas s", new Object[0]).toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"},{\"id\":3,\"name\":\"dt\",\"type\":\"STRING\"},{\"id\":4,\"name\":\"hh\",\"type\":\"STRING\"}], [\"dt\"], [], {}, ]]");
        Assertions.assertThat(sql("SELECT * FROM t1_p", new Object[0]).toString()).isEqualTo("[+I[1, 2, a, 2023-02-19, 12]]");
        sql("CREATE TABLE t_option (a INT) WITH ('file.format' = 'orc')", new Object[0]);
        sql("INSERT INTO t_option VALUES(1),(2)", new Object[0]);
        sql("CREATE TABLE t1_option WITH ('file.format' = 'parquet') AS SELECT * FROM t_option", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t1_option$options", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{"file.format", "parquet"})});
        Assertions.assertThat(sql("SELECT * FROM t1_option", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        sql("CREATE TABLE t_pk (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)", new Object[0]);
        sql("INSERT INTO t_pk VALUES(1,2,'aaa','2020-01-02','09')", new Object[0]);
        sql("CREATE TABLE t_pk_as WITH ('primary-key' = 'dt') AS SELECT * FROM t_pk", new Object[0]);
        Assertions.assertThat(sql("SHOW CREATE TABLE t_pk_as", new Object[0]).toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`)"});
        Assertions.assertThat(sql("SELECT * FROM t_pk_as", new Object[0]).toString()).isEqualTo("[+I[1, 2, aaa, 2020-01-02, 09]]");
        sql("CREATE TABLE t_all (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)", new Object[0]);
        sql("INSERT INTO t_all VALUES(1,2,'login','2020-01-02','09')", new Object[0]);
        sql("CREATE TABLE t_all_as WITH ('primary-key' = 'dt,hh' , 'partition' = 'dt' ) AS SELECT * FROM t_all", new Object[0]);
        List<Row> sql = sql("SHOW CREATE TABLE t_all_as", new Object[0]);
        Assertions.assertThat(sql.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`, `hh`)"});
        Assertions.assertThat(sql.toString()).contains(new CharSequence[]{"PARTITIONED BY (`dt`)"});
        Assertions.assertThat(sql("SELECT * FROM t_all_as", new Object[0]).toString()).isEqualTo("[+I[1, 2, login, 2020-01-02, 09]]");
        sql("CREATE TABLE t_pk_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist", new Object[0]);
        }).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all primary key constraint [aaa]");
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE t_pk_ddl_option (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) WITH ('primary-key' = 'dt')", new Object[0]);
        }).hasRootCauseMessage("Cannot define primary key on DDL and table options at the same time.");
        sql("CREATE TABLE t_partition_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh) ", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist", new Object[0]);
        }).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all partition fields [aaa]");
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE t_partition_ddl_option (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING) PARTITIONED BY (dt, hh)  WITH ('partition' = 'dt')", new Object[0]);
        }).hasRootCauseMessage("Cannot define partition on DDL and table options at the same time.");
    }

    @Test
    public void testConflictOption() {
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE T (a INT) WITH ('write-mode' = 'append-only', 'changelog-producer' = 'input')", new Object[0]);
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessage("Can not set the write-mode to append-only and changelog-producer at the same time.");
        sql("CREATE TABLE T (a INT) WITH ('write-mode' = 'append-only')", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            sql("ALTER TABLE T SET ('changelog-producer'='input')", new Object[0]);
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessage("Can not set the write-mode to append-only and changelog-producer at the same time.");
    }

    @Test
    public void testChangelogProducerOnAppendOnlyTable() {
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE T (a INT) WITH ('changelog-producer' = 'input')", new Object[0]);
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessage("Can not set changelog-producer on table without primary keys, please define primary keys.");
        sql("CREATE TABLE T (a INT)", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            sql("ALTER TABLE T SET ('changelog-producer'='input')", new Object[0]);
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessage("Can not set changelog-producer on table without primary keys, please define primary keys.");
    }

    @Test
    public void testFileFormatPerLevel() {
        sql("CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('num-sorted-run.compaction-trigger'='2','file.format.per.level' = '0:avro,3:parquet', 'num-levels' = '4')", new Object[0]);
        sql("INSERT INTO T1 SELECT 1,'AAA'", new Object[0]);
        sql("INSERT INTO T1 SELECT 2,'BBB'", new Object[0]);
        sql("INSERT INTO T1 SELECT 3,'CCC'", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "CCC"})});
        Assertions.assertThat(sql("SELECT level,file_format FROM T1$files", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{3, "parquet"}), Row.of(new Object[]{0, "avro"})});
    }

    @Test
    public void testFilesTable() throws Exception {
        sql("CREATE TABLE T_VALUE_COUNT (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p) WITH ('write-mode'='change-log')", new Object[0]);
        assertFilesTable("T_VALUE_COUNT");
        sql("CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p) WITH ('write-mode'='change-log')", new Object[0]);
        assertFilesTable("T_WITH_KEY");
        sql("CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p) WITH ('write-mode'='append-only')", new Object[0]);
        assertFilesTable("T_APPEND_ONLY");
    }

    private void assertFilesTable(String str) throws Exception {
        Assertions.assertThat(sql(String.format("SELECT * FROM %s$files", str), new Object[0])).isEmpty();
        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), new Path(this.path, String.format("default.db/%s", str)));
        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 1, 2, 'S1')", str), new Object[0]);
        schemaManager.commitChanges(Arrays.asList(SchemaChange.addColumn("d", new IntType()), SchemaChange.addColumn("e", new IntType()), SchemaChange.addColumn("f", new IntType())));
        sql(String.format("INSERT INTO %s VALUES (5, 1, 6, 'S3', 7, 8, 9), (10, 1, 11, 'S4', 12, 13, 14)", str), new Object[0]);
        schemaManager.commitChanges(Arrays.asList(SchemaChange.dropColumn("c"), SchemaChange.dropColumn("e"), SchemaChange.renameColumn("b", "bb"), SchemaChange.renameColumn("d", "dd")));
        sql(String.format("INSERT INTO %s VALUES (19, 1, 20, 21, 22), (15, 1, 16, 17, 18), (23, 2, 24, 25, 26), (27, 2, 28, 29, 30)", str), new Object[0]);
        List<Row> sql = sql(String.format("SELECT * FROM %s$files", str), new Object[0]);
        for (Row row : sql) {
            Assertions.assertThat(StringUtils.endsWith((String) row.getField(2), ".orc")).isTrue();
            Assertions.assertThat(((Long) row.getField(7)).longValue()).isGreaterThan(0L);
        }
        ListAssert assertThat = Assertions.assertThat(getRowStringList(sql));
        String[] strArr = new String[4];
        Object[] objArr = new Object[1];
        objArr[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[23, 2, 24, 25, 26],[27, 2, 28, 29, 30]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[23],[27]";
        strArr[0] = String.format("[2],0,orc,2,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=23, bb=24, dd=25, f=26, p=2},{a=27, bb=28, dd=29, f=30, p=2}", objArr);
        Object[] objArr2 = new Object[1];
        objArr2[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[1, 1, 2, S1],[3, 1, 4, S2]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[1],[3]";
        strArr[1] = String.format("[1],0,orc,0,0,2,%s,{a=0, bb=0, dd=2, f=2, p=0},{a=1, bb=2, dd=null, f=null, p=1},{a=3, bb=4, dd=null, f=null, p=1}", objArr2);
        Object[] objArr3 = new Object[1];
        objArr3[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[5, 1, 6, S3, 7, 8, 9],[10, 1, 11, S4, 12, 13, 14]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[5],[10]";
        strArr[2] = String.format("[1],0,orc,1,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=5, bb=6, dd=7, f=9, p=1},{a=10, bb=11, dd=12, f=14, p=1}", objArr3);
        Object[] objArr4 = new Object[1];
        objArr4[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[15, 1, 16, 17, 18],[19, 1, 20, 21, 22]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[15],[19]";
        strArr[3] = String.format("[1],0,orc,2,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=15, bb=16, dd=17, f=18, p=1},{a=19, bb=20, dd=21, f=22, p=1}", objArr4);
        assertThat.containsExactlyInAnyOrder(strArr);
        List<Row> sql2 = sql(String.format("SELECT * FROM %s$files /*+ OPTIONS('scan.snapshot-id'='2') */", str), new Object[0]);
        for (Row row2 : sql2) {
            Assertions.assertThat(StringUtils.endsWith((String) row2.getField(2), ".orc")).isTrue();
            Assertions.assertThat(((Long) row2.getField(7)).longValue()).isGreaterThan(0L);
        }
        ListAssert assertThat2 = Assertions.assertThat(getRowStringList(sql2));
        String[] strArr2 = new String[2];
        Object[] objArr5 = new Object[1];
        objArr5[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[1, 1, 2, S1],[3, 1, 4, S2]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[1],[3]";
        strArr2[0] = String.format("[1],0,orc,0,0,2,%s,{a=0, b=0, c=0, d=2, e=2, f=2, p=0},{a=1, b=2, c=S1, d=null, e=null, f=null, p=1},{a=3, b=4, c=S2, d=null, e=null, f=null, p=1}", objArr5);
        Object[] objArr6 = new Object[1];
        objArr6[0] = StringUtils.endsWith(str, "VALUE_COUNT") ? "[5, 1, 6, S3, 7, 8, 9],[10, 1, 11, S4, 12, 13, 14]" : StringUtils.endsWith(str, "APPEND_ONLY") ? "," : "[5],[10]";
        strArr2[1] = String.format("[1],0,orc,1,0,2,%s,{a=0, b=0, c=0, d=0, e=0, f=0, p=0},{a=5, b=6, c=S3, d=7, e=8, f=9, p=1},{a=10, b=11, c=S4, d=12, e=13, f=14, p=1}", objArr6);
        assertThat2.containsExactlyInAnyOrder(strArr2);
    }

    @Nonnull
    private List<String> getRowStringList(List<Row> list) {
        return (List) list.stream().map(row -> {
            return StringUtils.join(new Object[]{row.getField(0), row.getField(1), row.getField(3), row.getField(4), row.getField(5), row.getField(6), row.getField(8), row.getField(9), row.getField(10), row.getField(11), row.getField(12)}, ",");
        }).collect(Collectors.toList());
    }

    @Test
    public void testTagsTable() throws Exception {
        sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        paimonTable("T").createTag("tag1", 1L);
        paimonTable("T").createTag("tag2", 2L);
        Assertions.assertThat(sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{"tag1", 1L, 0L, 1L}), Row.of(new Object[]{"tag2", 2L, 0L, 2L})});
    }

    @Test
    public void testConsumersTable() throws Exception {
        batchSql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        batchSql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T /*+ OPTIONS('consumer-id'='my1') */", new Object[0]));
        batchSql("INSERT INTO T VALUES (5, 6), (7, 8)", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2}), Row.of(new Object[]{3, 4})});
        of.close();
        Assertions.assertThat(sql("SELECT * FROM T$consumers", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{"my1", 3L})});
    }

    @Test
    public void testPartitionsTable() throws Exception {
        sql("CREATE TABLE T_VALUE_COUNT (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p) WITH ('write-mode'='change-log')", new Object[0]);
        assertFilesTable("T_VALUE_COUNT");
        sql("CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p) WITH ('write-mode'='change-log')", new Object[0]);
        assertFilesTable("T_WITH_KEY");
        sql("CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p) WITH ('write-mode'='append-only')", new Object[0]);
        assertPartitionsTable("T_APPEND_ONLY");
    }

    private void assertPartitionsTable(String str) throws Exception {
        Assertions.assertThat(sql(String.format("SELECT * FROM %s$partitions", str), new Object[0])).isEmpty();
        new SchemaManager(LocalFileIO.create(), new Path(this.path, String.format("default.db/%s", str)));
        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 'S1')", str), new Object[0]);
        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", str), new Object[0]);
        for (Row row : sql(String.format("SELECT * FROM %s$partitions", str), new Object[0])) {
            Assertions.assertThat((String) row.getField(0)).containsAnyOf(new CharSequence[]{"[1]", "[2]"});
            Assertions.assertThat(((Long) row.getField(2)).longValue()).isGreaterThan(0L);
        }
        sql(String.format("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", str), new Object[0]);
        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", str), new Object[0]);
        Iterator<Row> it = sql(String.format("SELECT * FROM %s$partitions", str), new Object[0]).iterator();
        while (it.hasNext()) {
            Assertions.assertThat((String) it.next().getField(0)).containsAnyOf(new CharSequence[]{"[1]", "[2]", "[3]", "[4]"});
        }
    }
}
