package org.apache.flink.cdc.connectors.starrocks.sink;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.class */
public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    @BeforeClass
    public static void before() {
        env.setParallelism(1);
        env.enableCheckpointing(3000L);
        env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Before
    public void initializeDatabaseAndTable() {
        executeSql(String.format("CREATE DATABASE IF NOT EXISTS `%s`;", StarRocksContainer.STARROCKS_DATABASE_NAME));
        LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
        executeSql(String.format("CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");", StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME, String.join(", ", Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)")), "id", "id"));
        LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
    }

    @After
    public void destroyDatabaseAndTable() {
        executeSql(String.format("DROP TABLE %s.%s;", StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME));
        LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME);
        executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
        LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
    }

    private List<Event> generateEvents(TableId tableId) {
        Schema build = Schema.newBuilder().column(new PhysicalColumn("id", DataTypes.INT(), (String) null)).column(new PhysicalColumn("number", DataTypes.DOUBLE(), (String) null)).column(new PhysicalColumn("name", DataTypes.VARCHAR(17), (String) null)).primaryKey(new String[]{"id"}).build();
        BinaryRecordDataGenerator binaryRecordDataGenerator = new BinaryRecordDataGenerator(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)}));
        return Arrays.asList(new CreateTableEvent(tableId, build), DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{17, Double.valueOf(3.14d), BinaryStringData.fromString("StarRocks")})), DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{19, Double.valueOf(2.718d), BinaryStringData.fromString("Que Sera Sera")})), DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{21, Double.valueOf(1.732d), BinaryStringData.fromString("Disenchanted")})), DataChangeEvent.deleteEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{19, Double.valueOf(2.718d), BinaryStringData.fromString("Que Sera Sera")})), DataChangeEvent.updateEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{17, Double.valueOf(3.14d), BinaryStringData.fromString("StarRocks")}), binaryRecordDataGenerator.generate(new Object[]{17, Double.valueOf(6.28d), BinaryStringData.fromString("StarRocks")})));
    }

    @Test
    public void testValuesToStarRocks() throws Exception {
        TableId tableId = TableId.tableId(StarRocksContainer.STARROCKS_DATABASE_NAME, StarRocksContainer.STARROCKS_TABLE_NAME);
        env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class)).sinkTo(createStarRocksDataSink(new Configuration().set(StarRocksDataSinkOptions.LOAD_URL, STARROCKS_CONTAINER.getLoadUrl()).set(StarRocksDataSinkOptions.JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl()).set(StarRocksDataSinkOptions.USERNAME, StarRocksContainer.STARROCKS_USERNAME).set(StarRocksDataSinkOptions.PASSWORD, StarRocksContainer.STARROCKS_PASSWORD)).getEventSinkProvider().getSink());
        env.execute("Values to StarRocks Sink");
        assertEqualsInAnyOrder(Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted"), fetchTableContent(tableId, 3));
    }
}
