package org.apache.flink.cdc.connectors.starrocks.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.class */
public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    @BeforeClass
    public static void before() {
        env.setParallelism(1);
        env.enableCheckpointing(3000L);
        env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Before
    public void initializeDatabase() {
        executeSql(String.format("CREATE DATABASE IF NOT EXISTS `%s`;", StarRocksContainer.STARROCKS_DATABASE_NAME));
        LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
    }

    @After
    public void destroyDatabase() {
        executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
        LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
    }

    private List<Event> generateAddColumnEvents(TableId tableId) {
        return Arrays.asList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build()), new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(new PhysicalColumn("extra_date", DataTypes.DATE(), (String) null)))), new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(new PhysicalColumn("extra_bool", DataTypes.BOOLEAN(), (String) null)))), new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(new PhysicalColumn("extra_decimal", DataTypes.DECIMAL(17, 0), (String) null)))));
    }

    private List<Event> generateDropColumnEvents(TableId tableId) {
        return Arrays.asList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build()), new DropColumnEvent(tableId, Collections.singletonList("number")));
    }

    private List<Event> generateRenameColumnEvents(TableId tableId) {
        return Arrays.asList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build()), new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")), new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
    }

    private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
        return Arrays.asList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build()), new AlterColumnTypeEvent(tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
    }

    private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
        return Arrays.asList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build()), new AlterColumnTypeEvent(tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
    }

    @Test
    public void testStarRocksDataType() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID")).column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean")).column(new PhysicalColumn("int", DataTypes.INT(), "Int")).column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int")).column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int")).column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float")).column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double")).column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char")).column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char")).column(new PhysicalColumn("string", DataTypes.STRING(), "String")).column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")).column(new PhysicalColumn("date", DataTypes.DATE(), "Date")).column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")).column(new PhysicalColumn("timestamp_3", DataTypes.TIMESTAMP(3), "Timestamp With Precision")).column(new PhysicalColumn("timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ")).column(new PhysicalColumn("timestampltz_3", DataTypes.TIMESTAMP_LTZ(3), "TimestampLTZ With Precision")).primaryKey(new String[]{"id"}).build())));
        assertEqualsInOrder(Arrays.asList("id | int | NO | true | null", "boolean | boolean | YES | false | null", "int | int | YES | false | null", "tinyint | tinyint | YES | false | null", "smallint | smallint | YES | false | null", "float | float | YES | false | null", "double | double | YES | false | null", "char | char(51) | YES | false | null", "varchar | varchar(51) | YES | false | null", "string | varchar(1048576) | YES | false | null", "decimal | decimal(17,7) | YES | false | null", "date | date | YES | false | null", "timestamp | datetime | YES | false | null", "timestamp_3 | datetime | YES | false | null", "timestampltz | datetime | YES | false | null", "timestampltz_3 | datetime | YES | false | null"), inspectTableSchema(tableId));
    }

    @Test
    public void testStarRocksAddColumn() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        runJobWithEvents(generateAddColumnEvents(tableId));
        assertEqualsInOrder(Arrays.asList("id | int | NO | true | null", "number | double | YES | false | null", "name | varchar(51) | YES | false | null", "extra_date | date | YES | false | null", "extra_bool | boolean | YES | false | null", "extra_decimal | decimal(17,0) | YES | false | null"), inspectTableSchema(tableId));
    }

    @Test
    public void testStarRocksDropColumn() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        runJobWithEvents(generateDropColumnEvents(tableId));
        assertEqualsInOrder(Arrays.asList("id | int | NO | true | null", "name | varchar(51) | YES | false | null"), inspectTableSchema(tableId));
    }

    @Test
    @Ignore("Rename column is not supported currently.")
    public void testStarRocksRenameColumn() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        runJobWithEvents(generateRenameColumnEvents(tableId));
        assertEqualsInOrder(Arrays.asList("id | int | NO | true | null", "kazu | double | YES | false | null", "namae | varchar(51) | YES | false | null"), inspectTableSchema(tableId));
    }

    @Test
    @Ignore("Alter column type is not supported currently.")
    public void testStarRocksAlterColumnType() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        runJobWithEvents(generateAlterColumnTypeEvents(tableId));
        assertEqualsInOrder(Arrays.asList("id | int | NO | true | null", "number | double | YES | false | null", "name | varchar(57) | YES | false | null"), inspectTableSchema(tableId));
    }

    @Test(expected = JobExecutionException.class)
    @Ignore("Alter column type is not supported currently.")
    public void testStarRocksNarrowingAlterColumnType() throws Exception {
        runJobWithEvents(generateNarrowingAlterColumnTypeEvents(TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME)));
    }

    private void runJobWithEvents(List<Event> list) throws Exception {
        DataStreamSource fromCollection = env.fromCollection(list, TypeInformation.of(Event.class));
        Configuration configuration = new Configuration().set(StarRocksDataSinkOptions.LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()).set(StarRocksDataSinkOptions.JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()).set(StarRocksDataSinkOptions.USERNAME, StarRocksContainer.STARROCKS_USERNAME).set(StarRocksDataSinkOptions.PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
        DataSink createStarRocksDataSink = createStarRocksDataSink(configuration);
        SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator(SchemaChangeBehavior.EVOLVE, "$$_schema_operator_$$", PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
        OperatorIDGenerator operatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
        new DataSinkTranslator().translate(new SinkDef("starrocks", "Dummy StarRocks Sink", configuration), schemaOperatorTranslator.translate(fromCollection, 1, createStarRocksDataSink.getMetadataApplier().setAcceptedSchemaEvolutionTypes((Set) Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet())), new ArrayList()), createStarRocksDataSink, operatorIDGenerator.generate());
        env.execute("StarRocks Schema Evolution Test");
    }
}
