package org.apache.flink.table.planner.operations;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.SourceQueryOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.expressions.utils.Func1$;
import org.apache.flink.table.planner.expressions.utils.Func8$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.OperationMatchers;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.DataType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.HamcrestCondition;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.class */
public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest$TestItem.class */
    public static class TestItem {
        private final String testExpr;

        @Nullable
        private Object expectedType;

        @Nullable
        private String expectedError;

        private TestItem(String str) {
            this.testExpr = str;
        }

        static TestItem fromTestExpr(String str) {
            return new TestItem(str);
        }

        TestItem withExpectedType(Object obj) {
            this.expectedType = obj;
            return this;
        }

        TestItem withExpectedError(String str) {
            this.expectedError = str;
            return this;
        }

        public String toString() {
            return this.testExpr;
        }
    }

    @Test
    public void testCreateDatabase() {
        String[] strArr = {"create database db1", "create database if not exists cat1.db1", "create database cat1.db1 comment 'db1_comment'", "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'K2' = 'V2')"};
        String[] strArr2 = {"builtin", "cat1", "cat1", "cat1"};
        String[] strArr3 = {null, null, "db1_comment", "db1_comment"};
        boolean[] zArr = {false, true, false, false};
        HashMap hashMap = new HashMap();
        hashMap.put("k1", "v1");
        hashMap.put("K2", "V2");
        Map[] mapArr = {new HashMap(), new HashMap(), new HashMap(), new HashMap(hashMap)};
        for (int i = 0; i < strArr.length; i++) {
            CreateDatabaseOperation parse = parse(strArr[i]);
            Assertions.assertThat(parse).isInstanceOf(CreateDatabaseOperation.class);
            CreateDatabaseOperation createDatabaseOperation = parse;
            Assertions.assertThat(createDatabaseOperation.getCatalogName()).isEqualTo(strArr2[i]);
            Assertions.assertThat(createDatabaseOperation.getDatabaseName()).isEqualTo("db1");
            Assertions.assertThat(createDatabaseOperation.getCatalogDatabase().getComment()).isEqualTo(strArr3[i]);
            Assertions.assertThat(createDatabaseOperation.isIgnoreIfExists()).isEqualTo(zArr[i]);
            Assertions.assertThat(createDatabaseOperation.getCatalogDatabase().getProperties()).isEqualTo(mapArr[i]);
        }
    }

    @Test
    public void testDropDatabase() {
        String[] strArr = {"drop database db1", "drop database if exists db1", "drop database if exists cat1.db1 CASCADE", "drop database if exists cat1.db1 RESTRICT"};
        String[] strArr2 = {"builtin", "builtin", "cat1", "cat1"};
        boolean[] zArr = {false, true, true, true};
        boolean[] zArr2 = {false, false, true, false};
        for (int i = 0; i < strArr.length; i++) {
            DropDatabaseOperation parse = parse(strArr[i]);
            Assertions.assertThat(parse).isInstanceOf(DropDatabaseOperation.class);
            DropDatabaseOperation dropDatabaseOperation = parse;
            Assertions.assertThat(dropDatabaseOperation.getCatalogName()).isEqualTo(strArr2[i]);
            Assertions.assertThat(dropDatabaseOperation.getDatabaseName()).isEqualTo("db1");
            Assertions.assertThat(dropDatabaseOperation.isIfExists()).isEqualTo(zArr[i]);
            Assertions.assertThat(dropDatabaseOperation.isCascade()).isEqualTo(zArr2[i]);
        }
    }

    @Test
    public void testAlterDatabase() throws Exception {
        this.catalogManager.registerCatalog("cat1", new GenericInMemoryCatalog("default", "default"));
        this.catalogManager.createDatabase("cat1", "db1", new CatalogDatabaseImpl(new HashMap(), "db1_comment"), true);
        AlterDatabaseOperation parse = parse("alter database cat1.db1 set ('k1'='v1', 'K2'='V2')");
        Assertions.assertThat(parse).isInstanceOf(AlterDatabaseOperation.class);
        HashMap hashMap = new HashMap();
        hashMap.put("k1", "v1");
        hashMap.put("K2", "V2");
        AlterDatabaseOperation alterDatabaseOperation = parse;
        Assertions.assertThat(alterDatabaseOperation.getDatabaseName()).isEqualTo("db1");
        Assertions.assertThat(alterDatabaseOperation.getCatalogName()).isEqualTo("cat1");
        Assertions.assertThat(alterDatabaseOperation.getCatalogDatabase().getComment()).isEqualTo("db1_comment");
        Assertions.assertThat(alterDatabaseOperation.getCatalogDatabase().getProperties()).isEqualTo(hashMap);
    }

    @Test
    public void testCreateTable() {
        CreateTableOperation parse = parse("CREATE TABLE tbl1 (\n  a bigint comment 'column a',\n  b varchar, \n  c int, \n  d varchar)\n  PARTITIONED BY (a, d)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n", getPlannerBySqlDialect(SqlDialect.DEFAULT), getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse).isInstanceOf(CreateTableOperation.class);
        ResolvedCatalogTable catalogTable = parse.getCatalogTable();
        Assertions.assertThat(catalogTable.getPartitionKeys()).hasSameElementsAs(Arrays.asList("a", "d"));
        Assertions.assertThat(catalogTable.getSchema().getFieldNames()).isEqualTo(new String[]{"a", "b", "c", "d"});
        Assertions.assertThat(catalogTable.getSchema().getFieldDataTypes()).isEqualTo(new DataType[]{DataTypes.BIGINT(), DataTypes.VARCHAR(Integer.MAX_VALUE), DataTypes.INT(), DataTypes.VARCHAR(Integer.MAX_VALUE)});
        Assertions.assertThat(catalogTable).isInstanceOf(ResolvedCatalogTable.class);
        catalogTable.getResolvedSchema().getColumn(0).ifPresent(column -> {
            Assertions.assertThat(column.getComment()).isEqualTo(Optional.of("column a"));
        });
    }

    @Test
    public void testCreateTableWithPrimaryKey() {
        CreateTableOperation parse = parse("CREATE TABLE tbl1 (\n  a bigint,\n  b varchar, \n  c int, \n  d varchar, \n  constraint ct1 primary key(a, b) not enforced\n) with (\n  'connector' = 'kafka', \n  'kafka.topic' = 'log.test'\n)\n", getPlannerBySqlDialect(SqlDialect.DEFAULT), getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse).isInstanceOf(CreateTableOperation.class);
        TableSchema schema = parse.getCatalogTable().getSchema();
        Assertions.assertThat((String) schema.getPrimaryKey().map((v0) -> {
            return v0.asSummaryString();
        }).orElse("fakeVal")).isEqualTo("CONSTRAINT ct1 PRIMARY KEY (a, b)");
        Assertions.assertThat(schema.getFieldNames()).isEqualTo(new String[]{"a", "b", "c", "d"});
        Assertions.assertThat(schema.getFieldDataTypes()).isEqualTo(new DataType[]{(DataType) DataTypes.BIGINT().notNull(), (DataType) DataTypes.STRING().notNull(), DataTypes.INT(), DataTypes.STRING()});
    }

    @Test
    public void testPrimaryKeyOnGeneratedColumn() {
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, c) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Could not create a PRIMARY KEY with column 'c' at line 5, column 34.\nA PRIMARY KEY constraint must be declared on physical columns.");
    }

    @Test
    public void testPrimaryKeyNonExistentColumn() {
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, d) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Primary key column 'd' is not defined in the schema at line 5, column 34");
    }

    @Test
    public void testCreateTableWithMinusInOptionKey() {
        FlinkPlannerImpl plannerBySqlDialect = getPlannerBySqlDialect(SqlDialect.DEFAULT);
        SqlNode parse = getParserBySqlDialect(SqlDialect.DEFAULT).parse("create table source_table(\n  a int,\n  b bigint,\n  c varchar\n) with (\n  'a-B-c-d124' = 'Ab',\n  'a.b-c-d.e-f.g' = 'ada',\n  'a.b-c-d.e-f1231.g' = 'ada',\n  'a.b-c-d.*' = 'adad')\n");
        Assertions.assertThat(parse).isInstanceOf(SqlCreateTable.class);
        CreateTableOperation createTableOperation = (Operation) SqlNodeToOperationConversion.convert(plannerBySqlDialect, this.catalogManager, parse).get();
        Assertions.assertThat(createTableOperation).isInstanceOf(CreateTableOperation.class);
        Assertions.assertThat(new TreeMap((Map) createTableOperation.getCatalogTable().getOptions().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).toString()).isEqualTo("{a-B-c-d124=Ab, a.b-c-d.*=adad, a.b-c-d.e-f.g=ada, a.b-c-d.e-f1231.g=ada}");
    }

    @Test
    public void testCreateTableWithWatermark() throws FunctionAlreadyExistException, DatabaseNotExistException {
        this.catalog.createFunction(ObjectPath.fromString("default.myfunc"), new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName()), true);
        FlinkPlannerImpl plannerBySqlDialect = getPlannerBySqlDialect(SqlDialect.DEFAULT);
        SqlNode parse = getParserBySqlDialect(SqlDialect.DEFAULT).parse("create table source_table(\n  a int,\n  b bigint,\n  c timestamp(3),\n  watermark for `c` as myfunc(c, 1) - interval '5' second\n) with (\n  'connector.type' = 'kafka')\n");
        Assertions.assertThat(parse).isInstanceOf(SqlCreateTable.class);
        CreateTableOperation createTableOperation = (Operation) SqlNodeToOperationConversion.convert(plannerBySqlDialect, this.catalogManager, parse).get();
        Assertions.assertThat(createTableOperation).isInstanceOf(CreateTableOperation.class);
        Map properties = createTableOperation.getCatalogTable().toProperties();
        HashMap hashMap = new HashMap();
        hashMap.put("schema.0.name", "a");
        hashMap.put("schema.0.data-type", "INT");
        hashMap.put("schema.1.name", "b");
        hashMap.put("schema.1.data-type", "BIGINT");
        hashMap.put("schema.2.name", "c");
        hashMap.put("schema.2.data-type", "TIMESTAMP(3)");
        hashMap.put("schema.watermark.0.rowtime", "c");
        hashMap.put("schema.watermark.0.strategy.expr", "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
        hashMap.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        hashMap.put("connector.type", "kafka");
        Assertions.assertThat(properties).isEqualTo(hashMap);
    }

    @Test
    public void testBasicCreateTableLike() {
        HashMap hashMap = new HashMap();
        hashMap.put("format.type", "json");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).build(), (String) null, Collections.emptyList(), hashMap), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThat(parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable")).is(new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).column("a", DataTypes.INT()).watermark("f1", "`f1` - INTERVAL '5' SECOND").build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")), OperationMatchers.partitionedBy("a", "f0"))));
    }

    @Test
    public void testCreateTableLikeWithFullPath() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.type", "kafka");
        hashMap.put("format.type", "json");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).build(), (String) null, Collections.emptyList(), hashMap), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThat(parseAndConvert("create table mytable like `builtin`.`default`.sourceTable")).is(new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")))));
    }

    @Test
    public void testMergingCreateTableLike() {
        HashMap hashMap = new HashMap();
        hashMap.put("format.type", "json");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).columnByExpression("f2", "`f0` + 12345").watermark("f1", "`f1` - interval '1' second").build(), (String) null, Arrays.asList("f0", "f1"), hashMap), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThat(parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable (\n   EXCLUDING GENERATED\n   EXCLUDING PARTITIONS\n   OVERWRITING OPTIONS\n   OVERWRITING WATERMARKS)")).is(new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.TIMESTAMP(3)).column("a", DataTypes.INT()).watermark("f1", "`f1` - INTERVAL '5' SECOND").build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")), OperationMatchers.partitionedBy("a", "f0"))));
    }

    @Test
    public void testCreateTableInvalidPartition() {
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Partition column 'f3' not defined in the table schema. Available columns: ['a']");
    }

    @Test
    public void testCreateTableLikeInvalidPartition() {
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(), (String) null, Collections.emptyList(), Collections.emptyMap()), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)\nlike sourceTable");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Partition column 'f3' not defined in the table schema. Available columns: ['f0', 'a']");
    }

    @Test
    public void testCreateTableInvalidWatermark() {
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The rowtime attribute field 'f1' is not defined in the table schema, at line 3, column 17\nAvailable fields: ['a']");
    }

    @Test
    public void testCreateTableLikeInvalidWatermark() {
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(), (String) null, Collections.emptyList(), Collections.emptyMap()), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nlike sourceTable");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The rowtime attribute field 'f1' is not defined in the table schema, at line 3, column 17\nAvailable fields: ['f0', 'a']");
    }

    @Test
    public void testCreateTableLikeNestedWatermark() {
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))})).build(), (String) null, Collections.emptyList(), Collections.emptyMap()), ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
        Assertions.assertThatThrownBy(() -> {
            parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1.t as f1.t - interval '5' second\n)\nlike sourceTable");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The rowtime attribute field 'f1.t' is not defined in the table schema, at line 3, column 20\nNested field 't' was not found in a composite type: ROW<`tmstmp` TIMESTAMP(3)>.");
    }

    @Test
    public void testCreateTableWithFullDataTypes() {
        List asList = Arrays.asList(createTestItem("CHAR", DataTypes.CHAR(1)), createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()), createTestItem("CHAR NULL", DataTypes.CHAR(1)), createTestItem("CHAR(33)", DataTypes.CHAR(33)), createTestItem("VARCHAR", DataTypes.STRING()), createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)), createTestItem("STRING", DataTypes.STRING()), createTestItem("BOOLEAN", DataTypes.BOOLEAN()), createTestItem("BINARY", DataTypes.BINARY(1)), createTestItem("BINARY(33)", DataTypes.BINARY(33)), createTestItem("VARBINARY", DataTypes.BYTES()), createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)), createTestItem("BYTES", DataTypes.BYTES()), createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)), createTestItem("DEC", DataTypes.DECIMAL(10, 0)), createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)), createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)), createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)), createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)), createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)), createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)), createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)), createTestItem("TINYINT", DataTypes.TINYINT()), createTestItem("SMALLINT", DataTypes.SMALLINT()), createTestItem("INTEGER", DataTypes.INT()), createTestItem("INT", DataTypes.INT()), createTestItem("BIGINT", DataTypes.BIGINT()), createTestItem("FLOAT", DataTypes.FLOAT()), createTestItem("DOUBLE", DataTypes.DOUBLE()), createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()), createTestItem("DATE", DataTypes.DATE()), createTestItem("TIME", DataTypes.TIME()), createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()), createTestItem("TIME(3)", DataTypes.TIME()), createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()), createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)), createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(6)), createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)), createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)), createTestItem("TIMESTAMP WITH LOCAL TIME ZONE", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)), createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))), createTestItem("ARRAY<INT NOT NULL>", DataTypes.ARRAY(DataTypes.INT().notNull())), createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())), createTestItem("INT NOT NULL ARRAY", DataTypes.ARRAY(DataTypes.INT().notNull())), createTestItem("INT ARRAY NOT NULL", DataTypes.ARRAY(DataTypes.INT()).notNull()), createTestItem("MULTISET<INT NOT NULL>", DataTypes.MULTISET(DataTypes.INT().notNull())), createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())), createTestItem("INT NOT NULL MULTISET", DataTypes.MULTISET(DataTypes.INT().notNull())), createTestItem("INT MULTISET NOT NULL", DataTypes.MULTISET(DataTypes.INT()).notNull()), createTestItem("MAP<BIGINT, BOOLEAN>", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())), createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())})), createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())})), createTestItem("ROW<`f0` INT>", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT())})), createTestItem("ROW(`f0` INT)", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT())})), createTestItem("ROW<>", DataTypes.ROW()), createTestItem("ROW()", DataTypes.ROW()), createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', f1 BOOLEAN 'This as well.'>", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())})), createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>", DataTypes.ARRAY(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())}))), createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET", DataTypes.MULTISET(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())}))), createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>", DataTypes.MULTISET(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.BOOLEAN())}))), createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, f1 INT ARRAY, f2 BOOLEAN MULTISET>", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f00", DataTypes.INT()), DataTypes.FIELD("f01", DataTypes.BOOLEAN())})), DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())), DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN()))})));
        StringBuilder sb = new StringBuilder("create table t1(\n");
        for (int i = 0; i < asList.size(); i++) {
            sb.append("f").append(i).append(" ").append(((TestItem) asList.get(i)).testExpr);
            if (i == asList.size() - 1) {
                sb.append(")");
            } else {
                sb.append(",\n");
            }
        }
        String sb2 = sb.toString();
        FlinkPlannerImpl plannerBySqlDialect = getPlannerBySqlDialect(SqlDialect.DEFAULT);
        SqlNode parse = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sb2);
        Assertions.assertThat(parse).isInstanceOf(SqlCreateTable.class);
        Assertions.assertThat(((Operation) SqlNodeToOperationConversion.convert(plannerBySqlDialect, this.catalogManager, parse).get()).getCatalogTable().getSchema().getFieldDataTypes()).isEqualTo(asList.stream().map(testItem -> {
            return testItem.expectedType;
        }).toArray());
    }

    @Test
    public void testCreateTableWithComputedColumn() {
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of("builtin", "default", "my_udf1"), Func0$.MODULE$);
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of("builtin", "default", "my_udf2"), Func1$.MODULE$);
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of("builtin", "default", "my_udf3"), Func8$.MODULE$);
        CreateTableOperation parse = parse("CREATE TABLE tbl1 (\n  a int,\n  b varchar, \n  c as a - 1, \n  d as b || '$$', \n  e as my_udf1(a),  f as `default`.my_udf2(a) + 1,  g as builtin.`default`.my_udf3(a) || '##'\n)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n", getPlannerBySqlDialect(SqlDialect.DEFAULT), getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse).isInstanceOf(CreateTableOperation.class);
        CatalogTable catalogTable = parse.getCatalogTable();
        Assertions.assertThat(catalogTable.getSchema().getFieldNames()).isEqualTo(new String[]{"a", "b", "c", "d", "e", "f", "g"});
        Assertions.assertThat(catalogTable.getSchema().getFieldDataTypes()).isEqualTo(new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING(), (DataType) DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.STRING()});
        Stream stream = catalogTable.getSchema().getTableColumns().stream();
        Class<TableColumn.ComputedColumn> cls = TableColumn.ComputedColumn.class;
        TableColumn.ComputedColumn.class.getClass();
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<TableColumn.ComputedColumn> cls2 = TableColumn.ComputedColumn.class;
        TableColumn.ComputedColumn.class.getClass();
        Assertions.assertThat((String[]) filter.map((v1) -> {
            return r1.cast(v1);
        }).map((v0) -> {
            return v0.getExpression();
        }).toArray(i -> {
            return new String[i];
        })).isEqualTo(new String[]{"`a` - 1", "`b` || '$$'", "`builtin`.`default`.`my_udf1`(`a`)", "`builtin`.`default`.`my_udf2`(`a`) + 1", "`builtin`.`default`.`my_udf3`(`a`) || '##'"});
    }

    @Test
    public void testCreateTableWithMetadataColumn() {
        CreateTableOperation parse = parse("CREATE TABLE tbl1 (\n  a INT,\n  b STRING,\n  c INT METADATA,\n  d INT METADATA FROM 'other.key',\n  e INT METADATA VIRTUAL\n)\n  WITH (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n", getPlannerBySqlDialect(SqlDialect.DEFAULT), getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse).isInstanceOf(CreateTableOperation.class);
        TableSchema schema = parse.getCatalogTable().getSchema();
        Assertions.assertThat(schema).isEqualTo(TableSchema.builder().add(TableColumn.physical("a", DataTypes.INT())).add(TableColumn.physical("b", DataTypes.STRING())).add(TableColumn.metadata("c", DataTypes.INT())).add(TableColumn.metadata("d", DataTypes.INT(), "other.key")).add(TableColumn.metadata("e", DataTypes.INT(), true)).build());
    }

    @Test
    public void testCreateFunction() {
        FlinkPlannerImpl plannerBySqlDialect = getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CreateCatalogFunctionOperation parse = parse("CREATE FUNCTION test_udf AS 'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar'", plannerBySqlDialect, getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse).isInstanceOf(CreateCatalogFunctionOperation.class);
        CatalogFunction catalogFunction = parse.getCatalogFunction();
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("CREATE CATALOG FUNCTION: (catalogFunction: [Optional[This is a user-defined function]], identifier: [`builtin`.`default`.`test_udf`], ignoreIfExists: [false], isTemporary: [false])");
        Assertions.assertThat(catalogFunction).isEqualTo(new CatalogFunctionImpl("org.apache.fink.function.function1", FunctionLanguage.JAVA, Collections.singletonList(new ResourceUri(ResourceType.JAR, "file:///path/to/test.jar"))));
        Operation parse2 = parse("CREATE TEMPORARY SYSTEM FUNCTION test_udf2 AS 'org.apache.fink.function.function2' LANGUAGE SCALA USING JAR 'file:///path/to/test.jar'", plannerBySqlDialect, getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat(parse2).isInstanceOf(CreateTempSystemFunctionOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("CREATE TEMPORARY SYSTEM FUNCTION: (functionName: [test_udf2], catalogFunction: [CatalogFunctionImpl{className='org.apache.fink.function.function2', functionLanguage='SCALA', functionResource='[ResourceUri{resourceType=JAR, uri='file:///path/to/test.jar'}]'}], ignoreIfExists: [false], functionLanguage: [SCALA])");
    }

    @Test
    public void testAlterTable() throws Exception {
        prepareNonManagedTable(false);
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        ObjectIdentifier of2 = ObjectIdentifier.of("cat1", "db1", "tb2");
        for (String str : new String[]{"alter table cat1.db1.tb1 rename to tb2", "alter table db1.tb1 rename to tb2", "alter table tb1 rename to cat1.db1.tb2"}) {
            AlterTableRenameOperation parse = parse(str);
            Assertions.assertThat(parse).isInstanceOf(AlterTableRenameOperation.class);
            AlterTableRenameOperation alterTableRenameOperation = parse;
            Assertions.assertThat(alterTableRenameOperation.getTableIdentifier()).isEqualTo(of);
            Assertions.assertThat(alterTableRenameOperation.getNewTableIdentifier()).isEqualTo(of2);
        }
        checkAlterNonExistTable("alter table %s nonexistent rename to tb2");
        checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 'K2' = 'V2')");
        Operation parse2 = parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "dummy");
        hashMap.put("k", "v");
        hashMap.put("k1", "v1");
        hashMap.put("K2", "V2");
        assertAlterTableOptions(parse2, of, hashMap, Arrays.asList(TableChange.set("k1", "v1"), TableChange.set("K2", "V2")), "ALTER TABLE IF EXISTS cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 'K2' = 'V2'");
        checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
        assertAlterTableOptions(parse("alter table if exists cat1.db1.tb1 reset ('k')"), of, Collections.singletonMap("connector", "dummy"), Collections.singletonList(TableChange.reset("k")), "ALTER TABLE IF EXISTS cat1.db1.tb1\n  RESET 'k'");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table cat1.db1.tb1 reset ('connector')");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table cat1.db1.tb1 reset ()");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("ALTER TABLE RESET does not support empty key");
    }

    @Test
    public void testAlterTableRenameColumn() throws Exception {
        prepareTable("tb1", false, false, true, 3);
        AlterTableChangeOperation parse = parse("alter table tb1 rename c to c1");
        Assertions.assertThat(parse).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `c` TO `c1`");
        Assertions.assertThat(parse.getNewTable().getUnresolvedSchema()).isEqualTo(Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c1", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").watermark("ts", "ts - interval '5' seconds").primaryKeyNamed("ct1", new String[]{"a", "b", "c1"}).build());
        AlterTableChangeOperation parse2 = parse("alter table tb1 rename f to f1");
        Assertions.assertThat(parse2).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `f` TO `f1`");
        Assertions.assertThat(parse2.getNewTable().getUnresolvedSchema()).isEqualTo(Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f1", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").watermark("ts", "ts - interval '5' seconds").primaryKeyNamed("ct1", new String[]{"a", "b", "c"}).build());
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 rename a to a1");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `a` is referenced by computed column `d`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 rename ts to ts1");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `ts` is referenced by watermark expression.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 rename e.f1 to e.f11");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f1 is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 rename c to a");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `a` already existed in table schema.");
        ((Catalog) this.catalogManager.getCatalog("cat1").get()).createTable(new ObjectPath("db1", "tb2"), CatalogTable.of(Schema.newBuilder().column("a", DataTypes.STRING().notNull()).column("b", DataTypes.INT().notNull()).column("e", DataTypes.STRING()).columnByExpression("j", (Expression) Expressions.$("e").upperCase()).columnByExpression("g", "TO_TIMESTAMP(e)").primaryKey(new String[]{"a", "b"}).build(), "tb2", Collections.singletonList("a"), Collections.emptyMap()), true);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table `cat1`.`db1`.`tb2` rename e to e1");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `g`, `j`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 rename a to a1");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
        checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
    }

    @Test
    public void testFailedToAlterTableDropColumn() throws Exception {
        prepareTable("tb1", false, false, true, 3);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop x");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `x` does not exist in the base table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop (g, x)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `x` does not exist in the base table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop (g, c, g)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Duplicate column `g`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop e.f2");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f2 is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop a");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `a` is referenced by computed column `d`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop c");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `c` is used as the primary key.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop ts");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The column `ts` is referenced by watermark expression.");
        checkAlterNonExistTable("alter table %s nonexistent drop a");
    }

    @Test
    public void testAlterTableDropColumn() throws Exception {
        prepareNonManagedTable(false);
        AlterTableChangeOperation parse = parse("alter table tb1 drop c");
        Assertions.assertThat(parse).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP `c`");
        Assertions.assertThat((List) parse.getNewTable().getUnresolvedSchema().getColumns().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())).doesNotContain(new String[]{"c"});
        AlterTableChangeOperation parse2 = parse("alter table tb1 drop (f, e, b, d)");
        Assertions.assertThat(parse2).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP `d`,\n  DROP `f`,\n  DROP `b`,\n  DROP `e`");
        Assertions.assertThat((List) parse2.getNewTable().getUnresolvedSchema().getColumns().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())).doesNotContain(new String[]{"f", "e", "b", "d"});
    }

    @Test
    public void testFailedToAlterTableDropConstraint() throws Exception {
        prepareNonManagedTable("tb1", 0);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop primary key");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define any primary key.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop constraint ct");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define any primary key.");
        prepareNonManagedTable("tb2", 1);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 drop constraint ct2");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
        checkAlterNonExistTable("alter table %s nonexistent drop primary key");
        checkAlterNonExistTable("alter table %s nonexistent drop constraint ct");
    }

    @Test
    public void testAlterTableDropConstraint() throws Exception {
        prepareNonManagedTable(true);
        AlterTableChangeOperation parse = parse("alter table tb1 drop constraint ct1");
        Assertions.assertThat(parse).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP CONSTRAINT ct1");
        Assertions.assertThat(parse.getNewTable().getUnresolvedSchema().getPrimaryKey()).isNotPresent();
        AlterTableChangeOperation parse2 = parse("alter table tb1 drop primary key");
        Assertions.assertThat(parse2).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP CONSTRAINT ct1");
        Assertions.assertThat(parse2.getNewTable().getUnresolvedSchema().getPrimaryKey()).isNotPresent();
    }

    @Test
    public void testFailedToAlterTableDropWatermark() throws Exception {
        prepareNonManagedTable("tb1", false);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 drop watermark");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define any watermark strategy.");
        checkAlterNonExistTable("alter table %s nonexistent drop watermark");
    }

    @Test
    public void testAlterTableDropWatermark() throws Exception {
        prepareNonManagedTable("tb1", true);
        AlterTableChangeOperation parse = parse("alter table tb1 drop watermark");
        Assertions.assertThat(parse).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP WATERMARK");
        Assertions.assertThat(parse.getNewTable().getUnresolvedSchema().getWatermarkSpecs()).isEmpty();
    }

    @Test
    public void testAlterTableCompactOnNonManagedTable() throws Exception {
        prepareNonManagedTable(false);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 compact");
        }).isInstanceOf(ValidationException.class).hasMessage("ALTER TABLE COMPACT operation is not supported for non-managed table `cat1`.`db1`.`tb1`");
    }

    @Test
    public void testAlterTableCompactOnManagedNonPartitionedTable() throws Exception {
        prepareManagedTable(false);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 partition(dt = 'a') compact");
        }).isInstanceOf(ValidationException.class).hasMessage("Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned.");
        checkAlterNonExistTable("alter table %s nonexistent compact");
        checkAlterTableCompact(parse("alter table tb1 compact"), Collections.emptyMap());
    }

    @Test
    public void testAlterTableCompactOnManagedPartitionedTable() throws Exception {
        prepareManagedTable(true);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 partition (dt = 'a') compact");
        }).isInstanceOf(ValidationException.class).hasMessage("Partition column 'dt' not defined in the table schema. Available ordered partition columns: ['b', 'c']");
        HashMap hashMap = new HashMap();
        hashMap.put("b", "0");
        hashMap.put("c", "flink");
        checkAlterTableCompact(parse("alter table tb1 partition (b = 0, c = 'flink') compact"), hashMap);
        checkAlterTableCompact(parse("alter table tb1 partition (b = 0) compact"), Collections.singletonMap("b", "0"));
        checkAlterTableCompact(parse("alter table tb1 partition (c = 'flink') compact"), Collections.singletonMap("c", "flink"));
        checkAlterTableCompact(parse("alter table tb1 compact"), Collections.emptyMap());
    }

    @Test
    public void testFailedToAlterTableAddColumn() throws Exception {
        prepareNonManagedTable("tb1", 0);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add a bigint");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Try to add a column `a` which already exists in the table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (x array<string>, x string)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Encounter duplicate column `x`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add x bigint after y");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Referenced column `y` by 'AFTER' does not exist in the table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (x bigint after y, y string first)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Referenced column `y` by 'AFTER' does not exist in the table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add m as n + 2");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'm'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (m as b * 2, n as m + 2)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'n'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (m as 'hello' || b)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'm'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (e.f3 string)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (x string after e.f2)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (e.f3 string after e.f1)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
        checkAlterNonExistTable("alter table %s nonexistent add a bigint not null");
    }

    @Test
    public void testAlterTableAddColumn() throws Exception {
        prepareNonManagedTable("tb1", 0);
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        Schema unresolvedSchema = ((ContextResolvedTable) this.catalogManager.getTable(of).get()).getTable().getUnresolvedSchema();
        Operation parse = parse("alter table if exists tb1 add h double not null comment 'h is double not null'");
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE IF EXISTS cat1.db1.tb1\n  ADD `h` DOUBLE NOT NULL COMMENT 'h is double not null' ");
        assertAlterTableSchema(parse, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("h", DataTypes.DOUBLE().notNull()).withComment("h is double not null").build());
        Operation parse2 = parse("alter table tb1 add (\n h as e.f2.f1 first,\n i as b*2 after b,\n j int metadata from 'mk1' virtual comment 'comment_metadata' first,\n k string primary key not enforced after h)");
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `h` ARRAY<FLOAT> AS `e`.`f2`.`f1` FIRST,\n  ADD `i` BIGINT NOT NULL AS `b` * 2 AFTER `b`,\n  ADD `j` INT METADATA FROM 'mk1' VIRTUAL COMMENT 'comment_metadata' FIRST,\n  ADD `k` STRING NOT NULL AFTER `h`,\n  ADD CONSTRAINT `PK_k` PRIMARY KEY (`k`) NOT ENFORCED");
        assertAlterTableSchema(parse2, of, Schema.newBuilder().columnByMetadata("j", DataTypes.INT(), "mk1", true).withComment("comment_metadata").columnByExpression("h", "`e`.`f2`.`f1`").column("k", DataTypes.STRING().notNull()).column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).columnByExpression("i", new SqlCallExpression("`b` * 2")).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").primaryKey(new String[]{"k"}).build());
        Operation parse3 = parse("alter table tb1 add (\n r row<r1 bigint, r2 string, r3 array<double> not null> not null comment 'add composite type',\n m map<string not null, int not null>,\n n as r.r1 * 2 after r,\n tss as to_timestamp(r.r2) comment 'rowtime' after ts,\n na as r.r3 after ts)");
        Assertions.assertThat(parse3.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `r` ROW<`r1` BIGINT, `r2` STRING, `r3` ARRAY<DOUBLE> NOT NULL> NOT NULL COMMENT 'add composite type' ,\n  ADD `m` MAP<STRING NOT NULL, INT NOT NULL> ,\n  ADD `n` BIGINT AS `r`.`r1` * 2 AFTER `r`,\n  ADD `tss` TIMESTAMP(3) AS `to_timestamp`(`r`.`r2`) COMMENT 'rowtime' AFTER `ts`,\n  ADD `na` ARRAY<DOUBLE> NOT NULL AS `r`.`r3` AFTER `ts`");
        assertAlterTableSchema(parse3, of, Schema.newBuilder().fromSchema(unresolvedSchema).columnByExpression("na", "`r`.`r3`").columnByExpression("tss", "`to_timestamp`(`r`.`r2`)").withComment("rowtime").column("r", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("r1", DataTypes.BIGINT()), DataTypes.FIELD("r2", DataTypes.STRING()), DataTypes.FIELD("r3", DataTypes.ARRAY(DataTypes.DOUBLE()).notNull())}).notNull()).withComment("add composite type").columnByExpression("n", "`r`.`r1` * 2").column("m", DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.INT().notNull())).build());
    }

    @Test
    public void testFailedToAlterTableAddPk() throws Exception {
        prepareNonManagedTable("tb1", 1);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add primary key(c) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table has already defined the primary key constraint [`a`]. You might want to drop it before adding a new one.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add x string not null primary key not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table has already defined the primary key constraint [`a`]. You might want to drop it before adding a new one");
        prepareNonManagedTable("tb2", 2);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 add primary key(c) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table has already defined the primary key constraint [`a`, `b`]. You might want to drop it before adding a new one");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 add x string not null primary key not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table has already defined the primary key constraint [`a`, `b`]. You might want to drop it before adding a new one");
        prepareNonManagedTable("tb3", 0);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb3 add primary key (x) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'PK_x'. Column 'x' does not exist.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb3 add unique(b)");
        }).isInstanceOf(SqlValidateException.class).hasMessageContaining("UNIQUE constraint is not supported yet");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb3 add primary key(b)");
        }).isInstanceOf(SqlValidateException.class).hasMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY KEY constraint");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb3 add (\n  x as upper(c),\n  primary key (d, x) not enforced)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb3 add (primary key (g) not enforced)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
        checkAlterNonExistTable("alter table %s nonexistent add primary key(x) not enforced");
    }

    @Test
    public void testAlterTableAddPrimaryKey() throws Exception {
        prepareNonManagedTable("tb1", 0);
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        Schema unresolvedSchema = ((ContextResolvedTable) this.catalogManager.getTable(of).get()).getTable().getUnresolvedSchema();
        Operation parse = parse("alter table tb1 add constraint my_pk primary key (a, b) not enforced");
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD CONSTRAINT `my_pk` PRIMARY KEY (`a`, `b`) NOT ENFORCED");
        assertAlterTableSchema(parse, of, Schema.newBuilder().fromSchema(unresolvedSchema).primaryKeyNamed("my_pk", new String[]{"a", "b"}).build());
        Operation parse2 = parse("alter table tb1 add x bigint not null primary key not enforced");
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` BIGINT NOT NULL ,\n  ADD CONSTRAINT `PK_x` PRIMARY KEY (`x`) NOT ENFORCED");
        assertAlterTableSchema(parse2, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("x", DataTypes.BIGINT().notNull()).primaryKey(new String[]{"x"}).build());
        Operation parse3 = parse("alter table tb1 add x bigint primary key not enforced");
        Assertions.assertThat(parse3.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` BIGINT NOT NULL ,\n  ADD CONSTRAINT `PK_x` PRIMARY KEY (`x`) NOT ENFORCED");
        assertAlterTableSchema(parse3, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("x", DataTypes.BIGINT().notNull()).primaryKey(new String[]{"x"}).build());
        Operation parse4 = parse("alter table tb1 add constraint ct primary key(ts) not enforced");
        Assertions.assertThat(parse4.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD CONSTRAINT `ct` PRIMARY KEY (`ts`) NOT ENFORCED");
        assertAlterTableSchema(parse4, of, Schema.newBuilder().fromColumns(unresolvedSchema.getColumns().subList(0, unresolvedSchema.getColumns().size() - 1)).column("ts", DataTypes.TIMESTAMP(3).notNull()).withComment("just a comment").primaryKeyNamed("ct", new String[]{"ts"}).build());
    }

    @Test
    public void testFailedToAlterTableAddWatermark() throws Exception {
        prepareNonManagedTable("tb1", false);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add watermark for x as x");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid column name 'x' for rowtime attribute in watermark declaration. Available columns are: [a, b, c, d, e, f, g, ts]");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add watermark for b as b");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BIGINT NOT NULL");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 add (x row<f0 string, f1 timestamp(3)>, watermark for x.f1 as x.f1)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Watermark strategy on nested column is not supported yet.");
        prepareNonManagedTable("tb2", true);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 add watermark for ts as ts");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table has already defined the watermark strategy `ts` AS ts - interval '5' seconds. You might want to drop it before adding a new one.");
        checkAlterNonExistTable("alter table %s nonexistent add watermark for ts as ts");
    }

    @Test
    public void testAlterTableAddWatermark() throws Exception {
        prepareNonManagedTable("tb1", false);
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        Schema unresolvedSchema = ((ContextResolvedTable) this.catalogManager.getTable(of).get()).getTable().getUnresolvedSchema();
        Operation parse = parse("alter table tb1 add watermark for ts as ts");
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD WATERMARK FOR `ts`: TIMESTAMP(3) AS `ts`");
        assertAlterTableSchema(parse, of, Schema.newBuilder().fromSchema(unresolvedSchema).watermark("ts", "`ts`").build());
        Operation parse2 = parse("alter table tb1 add (tss timestamp(3) not null, watermark for tss as tss)");
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `tss` TIMESTAMP(3) NOT NULL ,\n  ADD WATERMARK FOR `tss`: TIMESTAMP(3) NOT NULL AS `tss`");
        assertAlterTableSchema(parse2, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("tss", DataTypes.TIMESTAMP(3).notNull()).watermark("tss", "`tss`").build());
        Operation parse3 = parse("alter table tb1 add (log_ts string not null,\ntss as to_timestamp(log_ts),\nwatermark for tss as tss - interval '3' second)");
        Assertions.assertThat(parse3.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `log_ts` STRING NOT NULL ,\n  ADD `tss` TIMESTAMP(3) AS `to_timestamp`(`log_ts`) ,\n  ADD WATERMARK FOR `tss`: TIMESTAMP(3) AS `tss` - INTERVAL '3' SECOND");
        assertAlterTableSchema(parse3, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("log_ts", DataTypes.STRING().notNull()).columnByExpression("tss", "`to_timestamp`(`log_ts`)").watermark("tss", "`tss` - INTERVAL '3' SECOND").build());
        Operation parse4 = parse("alter table tb1 add (x row<f0 string, f1 timestamp(3) not null> not null, y as x.f1, watermark for y as y - interval '1' day)");
        Assertions.assertThat(parse4.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` ROW<`f0` STRING, `f1` TIMESTAMP(3) NOT NULL> NOT NULL ,\n  ADD `y` TIMESTAMP(3) NOT NULL AS `x`.`f1` ,\n  ADD WATERMARK FOR `y`: TIMESTAMP(3) NOT NULL AS `y` - INTERVAL '1' DAY");
        assertAlterTableSchema(parse4, of, Schema.newBuilder().fromSchema(unresolvedSchema).column("x", DataTypes.ROW(new DataType[]{DataTypes.STRING(), (DataType) DataTypes.TIMESTAMP(3).notNull()}).notNull()).columnByExpression("y", "`x`.`f1`").watermark("y", "`y` - INTERVAL '1' DAY").build());
    }

    @Test
    public void testFailedToAlterTableModifyColumn() throws Exception {
        prepareNonManagedTable("tb1", true);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify (b int, b array<int not null>)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Encounter duplicate column `b`.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify x bigint");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Try to modify a column `x` which does not exist in the table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify a bigint after x");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Referenced column `x` by 'AFTER' does not exist in the table.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify e array<int>");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'f'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify a string");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'd'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify b as a + 2");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'd'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify (a timestamp(3), b multiset<int>)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid expression for computed column 'd'.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify ts int");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is INT");
        prepareNonManagedTable("tb2", 1);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify (d int, a as b + 2)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'ct1'. Column 'a' is not a physical column.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify (d string, a int metadata virtual)");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'ct1'. Column 'a' is not a physical column.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify (e.f0 string)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify (g string after e.f2)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type is not supported yet.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify (e.f0 string after e.f1)");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
        checkAlterNonExistTable("alter table %s nonexistent modify a int first");
    }

    @Test
    public void testAlterTableModifyColumn() throws Exception {
        prepareNonManagedTable("tb1", 2);
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        Operation parse = parse("alter table tb1 modify b bigint not null comment 'move b to first and add comment' first");
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `b` COMMENT 'move b to first and add comment',\n  MODIFY `b` FIRST");
        assertAlterTableSchema(parse, of, Schema.newBuilder().column("b", DataTypes.BIGINT().notNull()).withComment("move b to first and add comment").column("a", DataTypes.INT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").primaryKeyNamed("ct1", new String[]{"a", "b"}).build());
        Operation parse2 = parse("alter table tb1 modify ts timestamp(3) not null after e");
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `ts` TIMESTAMP(3) NOT NULL,\n  MODIFY `ts` AFTER `e`");
        assertAlterTableSchema(parse2, of, Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).column("ts", DataTypes.TIMESTAMP(3).notNull()).withComment("just a comment").columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).primaryKeyNamed("ct1", new String[]{"a", "b"}).build());
        Operation parse3 = parse("alter table tb1 modify (\nd as a + 2 comment 'change d' after b,\nc bigint first,\ne string comment 'change e',\nf as upper(e) comment 'change f' after ts,\ng int not null comment 'change g',\nconstraint ct2 primary key(e) not enforced)");
        Assertions.assertThat(parse3.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `d` INT NOT NULL AS `a` + 2 COMMENT 'change d' AFTER `b`,\n  MODIFY `c` BIGINT,\n  MODIFY `c` FIRST,\n  MODIFY `e` COMMENT 'change e',\n  MODIFY `e` STRING NOT NULL,\n  MODIFY `f` STRING NOT NULL AS UPPER(`e`) COMMENT 'change f' AFTER `ts`,\n  MODIFY `g` INT NOT NULL COMMENT 'change g' ,\n  MODIFY CONSTRAINT `ct2` PRIMARY KEY (`e`) NOT ENFORCED");
        assertAlterTableSchema(parse3, of, Schema.newBuilder().column("c", DataTypes.BIGINT()).withComment("column comment").column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).columnByExpression("d", "`a` + 2").withComment("change d").column("e", DataTypes.STRING().notNull()).withComment("change e").column("g", DataTypes.INT().notNull()).withComment("change g").column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").columnByExpression("f", "UPPER(`e`)").withComment("change f").primaryKeyNamed("ct2", new String[]{"e"}).build());
        prepareNonManagedTable("tb2", true);
        ObjectIdentifier of2 = ObjectIdentifier.of("cat1", "db1", "tb2");
        Operation parse4 = parse("alter table tb2 modify (ts int comment 'change ts',\nf timestamp(3) not null,\ne int metadata virtual,\nwatermark for f as f,\ng multiset<int> not null comment 'change g' first)");
        Assertions.assertThat(parse4.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb2\n  MODIFY `ts` COMMENT 'change ts',\n  MODIFY `ts` INT,\n  MODIFY `f` TIMESTAMP(3) NOT NULL ,\n  MODIFY `e` INT METADATA VIRTUAL ,\n  MODIFY `g` MULTISET<INT> NOT NULL COMMENT 'change g' FIRST,\n  MODIFY WATERMARK FOR `f`: TIMESTAMP(3) NOT NULL AS `f`");
        assertAlterTableSchema(parse4, of2, Schema.newBuilder().column("g", DataTypes.MULTISET(DataTypes.INT()).notNull()).withComment("change g").column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").columnByMetadata("e", DataTypes.INT(), (String) null, true).column("f", DataTypes.TIMESTAMP(3).notNull()).column("ts", DataTypes.INT()).withComment("change ts").watermark("f", "`f`").build());
    }

    @Test
    public void testFailedToAlterTableModifyPk() throws Exception {
        prepareNonManagedTable("tb1", 0);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify constraint ct primary key (b) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define any primary key constraint. You might want to add a new one.");
        prepareNonManagedTable("tb2", 1);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify constraint ct primary key (x) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'ct'. Column 'x' does not exist.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify constraint ct primary key (d) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'ct'. Column 'd' is not a physical column.");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify constraint ct primary key (g) not enforced");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid primary key 'ct'. Column 'g' is not a physical column.");
        checkAlterNonExistTable("alter table %s nonexistent modify constraint ct primary key(a) not enforced");
    }

    @Test
    public void testAlterTableModifyPk() throws Exception {
        prepareNonManagedTable("tb1", 1);
        assertAlterTableSchema(parse("alter table tb1 modify constraint ct2 primary key (a, b) not enforced"), ObjectIdentifier.of("cat1", "db1", "tb1"), Schema.newBuilder().fromColumns(((ContextResolvedTable) this.catalogManager.getTable(ObjectIdentifier.of("cat1", "db1", "tb1")).get()).getTable().getUnresolvedSchema().getColumns()).primaryKeyNamed("ct2", new String[]{"a", "b"}).build());
        assertAlterTableSchema(parse("alter table tb1 modify primary key (c, a) not enforced"), ObjectIdentifier.of("cat1", "db1", "tb1"), Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").primaryKeyNamed("PK_c_a", new String[]{"c", "a"}).build());
    }

    @Test
    public void testFailedToAlterTableModifyWatermark() throws Exception {
        prepareNonManagedTable("tb1", false);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb1 modify watermark for a as to_timestamp(a) - interval '1' minute");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("The base table does not define any watermark. You might want to add a new one.");
        prepareNonManagedTable("tb2", true);
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify watermark for a as a");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is INT NOT NULL");
        Assertions.assertThatThrownBy(() -> {
            parse("alter table tb2 modify watermark for c as to_timestamp(c) - interval '1' day");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is STRING");
        checkAlterNonExistTable("alter table %s nonexistent modify watermark for ts as ts");
    }

    @Test
    public void testAlterTableModifyWatermark() throws Exception {
        prepareNonManagedTable("tb1", true);
        Operation parse = parse("alter table tb1 modify watermark for ts as ts");
        ObjectIdentifier of = ObjectIdentifier.of("cat1", "db1", "tb1");
        List columns = ((ContextResolvedTable) this.catalogManager.getTable(of).get()).getTable().getUnresolvedSchema().getColumns();
        assertAlterTableSchema(parse, of, Schema.newBuilder().fromColumns(columns).watermark("ts", "`ts`").build());
        assertAlterTableSchema(parse("alter table tb1 modify (g timestamp(3) not null, watermark for g as g)"), of, Schema.newBuilder().fromColumns(columns.subList(0, columns.size() - 2)).column("g", DataTypes.TIMESTAMP(3).notNull()).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment").watermark("g", "`g`").build());
    }

    @Test
    public void testCreateViewWithMatchRecognize() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", TestValuesTableFactory.IDENTIFIER);
        hashMap.put("bounded", "true");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("id", DataTypes.INT().notNull()).column("measurement", DataTypes.BIGINT().notNull()).column("ts", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))})).build(), (String) null, Collections.emptyList(), hashMap), ObjectIdentifier.of("builtin", "default", "events"), false);
        Assertions.assertThat(parse("CREATE TEMPORARY VIEW foo AS SELECT * FROM events MATCH_RECOGNIZE (    PARTITION BY id     ORDER BY ts ASC     MEASURES       next_step.measurement - this_step.measurement AS diff     AFTER MATCH SKIP TO NEXT ROW     PATTERN (this_step next_step)    DEFINE          this_step AS TRUE,         next_step AS TRUE)")).isInstanceOf(CreateViewOperation.class);
    }

    @Test
    public void testCreateViewWithDynamicTableOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", TestValuesTableFactory.IDENTIFIER);
        hashMap.put("bounded", "true");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("f0", DataTypes.INT()).column("f1", DataTypes.VARCHAR(20)).build(), (String) null, Collections.emptyList(), hashMap), ObjectIdentifier.of("builtin", "default", "sourceA"), false);
        Assertions.assertThat(parse("create view test_view as\nselect *\nfrom sourceA /*+ OPTIONS('changelog-mode'='I') */")).isInstanceOf(CreateViewOperation.class);
    }

    @Test
    public void testAlterTableAddPartitions() throws Exception {
        prepareTable("tb1", false, true, true, 0);
        Operation parse = parse("alter table tb1 add partition (b = '1', c = '2')");
        Assertions.assertThat(parse).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD PARTITION (b=1, c=2)");
        Operation parse2 = parse("alter table tb1 add partition (b = '1', c = '2') with ('k' = 'v')");
        Assertions.assertThat(parse2).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD PARTITION (b=1, c=2) WITH (k: [v])");
        Operation parse3 = parse("alter table tb1 add if not exists partition (b = '1', c = '2') with ('k' = 'v') partition (b = '2')");
        Assertions.assertThat(parse3).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat(parse3.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD IF NOT EXISTS PARTITION (b=1, c=2) WITH (k: [v]) PARTITION (b=2)");
    }

    @Test
    public void testAlterTableDropPartitions() throws Exception {
        prepareTable("tb1", false, true, true, 0);
        Operation parse = parse("alter table tb1 drop partition (b = '1', c = '2')");
        Assertions.assertThat(parse).isInstanceOf(DropPartitionsOperation.class);
        Assertions.assertThat(parse.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 DROP PARTITION (b=1, c=2)");
        Operation parse2 = parse("alter table tb1 drop if exists partition (b = '1', c = '2'), partition (b = '2')");
        Assertions.assertThat(parse2).isInstanceOf(DropPartitionsOperation.class);
        Assertions.assertThat(parse2.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 DROP IF EXISTS PARTITION (b=1, c=2) PARTITION (b=2)");
    }

    @Test
    public void testCreateViewWithDuplicateFieldName() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", TestValuesTableFactory.IDENTIFIER);
        hashMap.put("bounded", "true");
        this.catalogManager.createTable(CatalogTable.of(Schema.newBuilder().column("id", DataTypes.BIGINT().notNull()).column("uid", DataTypes.BIGINT().notNull()).build(), (String) null, Collections.emptyList(), hashMap), ObjectIdentifier.of("builtin", "default", "id_table"), false);
        Assertions.assertThat(parse("CREATE VIEW id_view(a, b) AS SELECT id, uid AS id FROM id_table")).isInstanceOf(CreateViewOperation.class);
        Assertions.assertThatThrownBy(() -> {
            parse("CREATE VIEW id_view(a, a) AS SELECT id, uid AS id FROM id_table");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, "A column with the same name `a` has been defined at line 1, column 37.")});
        Assertions.assertThatThrownBy(() -> {
            parse("CREATE VIEW id_view AS\nSELECT id, uid AS id FROM id_table");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, "A column with the same name `id` has been defined at line 2, column 8.")});
        Assertions.assertThatThrownBy(() -> {
            parse("CREATE VIEW union_view AS\n  SELECT id, uid AS id FROM id_table\n  UNION\n  SELECT uid, id AS uid FROM id_table");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, "A column with the same name `id` has been defined at line 2, column 10.")});
        Assertions.assertThatThrownBy(() -> {
            parse("CREATE VIEW cte_view AS\nWITH id_num AS (\n  select id from id_table\n)\nSELECT id, uid as id\nFROM id_table\n");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, "A column with the same name `id` has been defined at line 5, column 8.")});
    }

    private static TestItem createTestItem(Object... objArr) {
        Assertions.assertThat(objArr).hasSize(2);
        TestItem fromTestExpr = TestItem.fromTestExpr((String) objArr[0]);
        if (objArr[1] instanceof String) {
            fromTestExpr.withExpectedError((String) objArr[1]);
        } else {
            fromTestExpr.withExpectedType(objArr[1]);
        }
        return fromTestExpr;
    }

    private void prepareNonManagedTable(boolean z) throws Exception {
        prepareNonManagedTable("tb1", z ? 1 : 0);
    }

    private void prepareNonManagedTable(String str, int i) throws Exception {
        prepareTable(str, false, false, false, i);
    }

    private void prepareNonManagedTable(String str, boolean z) throws Exception {
        prepareTable(str, false, false, z, 0);
    }

    private void prepareManagedTable(boolean z) throws Exception {
        TestManagedTableFactory.MANAGED_TABLES.put(ObjectIdentifier.of("cat1", "db1", "tb1"), new AtomicReference());
        prepareTable("tb1", true, z, false, 0);
    }

    private void prepareTable(String str, boolean z, boolean z2, boolean z3, int i) throws Exception {
        GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("default", "default");
        if (!this.catalogManager.getCatalog("cat1").isPresent()) {
            this.catalogManager.registerCatalog("cat1", genericInMemoryCatalog);
        }
        this.catalogManager.createDatabase("cat1", "db1", new CatalogDatabaseImpl(new HashMap(), (String) null), true);
        Schema.Builder withComment = Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW(new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", DataTypes.STRING(), (String) null, true).column("ts", DataTypes.TIMESTAMP(3)).withComment("just a comment");
        HashMap hashMap = new HashMap();
        hashMap.put("k", "v");
        if (!z) {
            hashMap.put("connector", "dummy");
        }
        if (i != 0) {
            if (i == 1) {
                withComment.primaryKeyNamed("ct1", new String[]{"a"});
            } else if (i == 2) {
                withComment.primaryKeyNamed("ct1", new String[]{"a", "b"});
            } else {
                if (i != 3) {
                    throw new IllegalArgumentException(String.format("Don't support to set pk with %s fields.", Integer.valueOf(i)));
                }
                withComment.primaryKeyNamed("ct1", new String[]{"a", "b", "c"});
            }
        }
        if (z3) {
            withComment.watermark("ts", "ts - interval '5' seconds");
        }
        CatalogTable of = CatalogTable.of(withComment.build(), "a table", z2 ? Arrays.asList("b", "c") : Collections.emptyList(), Collections.unmodifiableMap(hashMap));
        this.catalogManager.setCurrentCatalog("cat1");
        this.catalogManager.setCurrentDatabase("db1");
        this.catalogManager.createTable(of, ObjectIdentifier.of("cat1", "db1", str), true);
    }

    private void assertAlterTableOptions(Operation operation, ObjectIdentifier objectIdentifier, Map<String, String> map, List<TableChange> list, String str) {
        Assertions.assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
        AlterTableChangeOperation alterTableChangeOperation = (AlterTableChangeOperation) operation;
        Assertions.assertThat(alterTableChangeOperation.getTableIdentifier()).isEqualTo(objectIdentifier);
        Assertions.assertThat(alterTableChangeOperation.getNewTable().getOptions()).isEqualTo(map);
        Assertions.assertThat(list).isEqualTo(alterTableChangeOperation.getTableChanges());
        Assertions.assertThat(alterTableChangeOperation.asSummaryString()).isEqualTo(str);
    }

    private void assertAlterTableSchema(Operation operation, ObjectIdentifier objectIdentifier, Schema schema) {
        Assertions.assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
        AlterTableChangeOperation alterTableChangeOperation = (AlterTableChangeOperation) operation;
        Assertions.assertThat(alterTableChangeOperation.getTableIdentifier()).isEqualTo(objectIdentifier);
        Assertions.assertThat(alterTableChangeOperation.getNewTable().getUnresolvedSchema()).isEqualTo(schema);
    }

    private void checkAlterNonExistTable(String str) {
        Assertions.assertThat(parse(String.format(str, "if exists "))).isInstanceOf(NopOperation.class);
        Assertions.assertThatThrownBy(() -> {
            parse(String.format(str, ""));
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Table `cat1`.`db1`.`nonexistent` doesn't exist or is a temporary table.");
    }

    private void checkAlterTableCompact(Operation operation, Map<String, String> map) {
        Assertions.assertThat(operation).isInstanceOf(SinkModifyOperation.class);
        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
        Assertions.assertThat(sinkModifyOperation.getStaticPartitions()).containsExactlyInAnyOrderEntriesOf(map);
        Assertions.assertThat(sinkModifyOperation.isOverwrite()).isFalse();
        Assertions.assertThat(sinkModifyOperation.getDynamicOptions()).containsEntry("ENRICHED_KEY", "ENRICHED_VALUE");
        Assertions.assertThat(sinkModifyOperation.getContextResolvedTable().getIdentifier()).isEqualTo(ObjectIdentifier.of("cat1", "db1", "tb1"));
        Assertions.assertThat(sinkModifyOperation.getChild()).isInstanceOf(SourceQueryOperation.class);
        SourceQueryOperation child = sinkModifyOperation.getChild();
        Assertions.assertThat(child.getChildren()).isEmpty();
        Assertions.assertThat(child.getDynamicOptions()).containsEntry("k", "v");
        Assertions.assertThat(child.getDynamicOptions()).containsEntry("ENRICHED_KEY", "ENRICHED_VALUE");
    }

    private Operation parseAndConvert(String str) {
        return (Operation) SqlNodeToOperationConversion.convert(getPlannerBySqlDialect(SqlDialect.DEFAULT), this.catalogManager, getParserBySqlDialect(SqlDialect.DEFAULT).parse(str)).get();
    }
}
