package org.apache.flink.connector.jdbc.table;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.class */
public abstract class JdbcDynamicTableSinkITCase extends AbstractTestBase implements DatabaseTest {
    private final TableRow upsertOutputTable = createUpsertOutputTable();
    private final TableRow appendOutputTable = createAppendOutputTable();
    private final TableRow batchOutputTable = createBatchOutputTable();
    private final TableRow realOutputTable = createRealOutputTable();
    private final TableRow checkpointOutputTable = createCheckpointOutputTable();
    private final TableRow userOutputTable = createUserOutputTable();

    protected TableRow createUpsertOutputTable() {
        return TableBuilder.tableRow("dynamicSinkForUpsert", TableBuilder.pkField("cnt", DataTypes.BIGINT().notNull()), TableBuilder.field("lencnt", DataTypes.BIGINT().notNull()), TableBuilder.pkField("cTag", DataTypes.INT().notNull()), TableBuilder.field("ts", TableBuilder.dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
    }

    protected TableRow createAppendOutputTable() {
        return TableBuilder.tableRow("dynamicSinkForAppend", TableBuilder.field("id", DataTypes.INT().notNull()), TableBuilder.field("num", DataTypes.BIGINT().notNull()), TableBuilder.field("ts", TableBuilder.dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
    }

    protected TableRow createBatchOutputTable() {
        return TableBuilder.tableRow("dynamicSinkForBatch", TableBuilder.field("NAME", DataTypes.VARCHAR(20).notNull()), TableBuilder.field("SCORE", DataTypes.BIGINT().notNull()));
    }

    protected TableRow createRealOutputTable() {
        return TableBuilder.tableRow("REAL_TABLE", TableBuilder.field("real_data", TableBuilder.dbType("REAL"), DataTypes.FLOAT()));
    }

    protected TableRow createCheckpointOutputTable() {
        return TableBuilder.tableRow("checkpointTable", TableBuilder.field("id", DataTypes.BIGINT().notNull()));
    }

    protected TableRow createUserOutputTable() {
        return TableBuilder.tableRow("USER_TABLE", TableBuilder.pkField("user_id", DataTypes.VARCHAR(20).notNull()), TableBuilder.field("user_name", DataTypes.VARCHAR(20).notNull()), TableBuilder.field("email", DataTypes.VARCHAR(255)), TableBuilder.field("balance", DataTypes.DECIMAL(18, 2)), TableBuilder.field("balance2", DataTypes.DECIMAL(18, 2)));
    }

    @Override // org.apache.flink.connector.jdbc.testutils.DatabaseTest
    public List<TableManaged> getManagedTables() {
        return Arrays.asList(this.upsertOutputTable, this.appendOutputTable, this.batchOutputTable, this.realOutputTable, this.checkpointOutputTable, this.userOutputTable);
    }

    @AfterEach
    void afterEach() {
        TestValuesTableFactory.clearAllData();
    }

    protected List<Row> testUserData() {
        return Arrays.asList(Row.of(new Object[]{"user1", "Tom", "tom123@gmail.com", new BigDecimal("8.10"), new BigDecimal("16.20")}), Row.of(new Object[]{"user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99"), new BigDecimal("19.98")}), Row.of(new Object[]{"user4", "Tina", "tina@gmail.com", new BigDecimal("11.30"), new BigDecimal("22.60")}));
    }

    protected List<Row> testData() {
        return Arrays.asList(Row.of(new Object[]{1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")}), Row.of(new Object[]{2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")}), Row.of(new Object[]{3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")}), Row.of(new Object[]{4, 3L, "Hello world, how are you?", Timestamp.valueOf("1970-01-01 00:00:00.004")}), Row.of(new Object[]{5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")}), Row.of(new Object[]{6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")}), Row.of(new Object[]{7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")}), Row.of(new Object[]{8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")}), Row.of(new Object[]{9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")}), Row.of(new Object[]{10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")}), Row.of(new Object[]{11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")}), Row.of(new Object[]{12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")}), Row.of(new Object[]{13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")}), Row.of(new Object[]{14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")}), Row.of(new Object[]{15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")}), Row.of(new Object[]{16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")}), Row.of(new Object[]{17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")}), Row.of(new Object[]{18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")}), Row.of(new Object[]{19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")}), Row.of(new Object[]{20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")}), Row.of(new Object[]{21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021")}));
    }

    protected Map<Integer, Row> testDataMap() {
        return (Map) testData().stream().collect(Collectors.toMap(row -> {
            return (Integer) row.getFieldAs(0);
        }, Function.identity()));
    }

    private void createTestDataTempView(StreamTableEnvironment streamTableEnvironment, String str) {
        streamTableEnvironment.createTemporaryView(str, streamTableEnvironment.fromValues(testData()).as("id", new String[]{"num", "text", "ts"}));
    }

    @Test
    void testReal() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.inStreamingMode());
        create.executeSql(this.realOutputTable.getCreateQueryForFlink(getMetadata(), "realSink"));
        create.executeSql(String.format("INSERT INTO %s SELECT CAST(1.0 as FLOAT)", "realSink")).await();
        Assertions.assertThat(this.realOutputTable.selectAllTable(getMetadata())).containsExactly(new Row[]{Row.of(new Object[]{Float.valueOf(1.0f)})});
    }

    @Test
    void testUpsert() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        createTestDataTempView(create, "testData");
        create.executeSql(this.upsertOutputTable.getCreateQueryForFlink(getMetadata(), "upsertSink", Arrays.asList("'sink.buffer-flush.max-rows' = '2'", "'sink.buffer-flush.interval' = '0'", "'sink.max-retries' = '0'")));
        create.executeSql(String.format("INSERT INTO %s  SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts  FROM (   SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts   FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM %s)   GROUP BY len, cTag  )  GROUP BY cnt, cTag", "upsertSink", "testData")).await();
        Map<Integer, Row> testDataMap = testDataMap();
        Assertions.assertThat(this.upsertOutputTable.selectAllTable(getMetadata())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1L, 5L, 1, testDataMap.get(6).getField(3)}), Row.of(new Object[]{7L, 1L, 1, testDataMap.get(21).getField(3)}), Row.of(new Object[]{9L, 1L, 1, testDataMap.get(15).getField(3)})});
    }

    @Test
    void testAppend() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.getConfig().setParallelism(1);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        createTestDataTempView(create, "testData");
        create.executeSql(this.appendOutputTable.getCreateQueryForFlink(getMetadata(), "appendSink"));
        HashSet hashSet = new HashSet(Arrays.asList(2, 10, 20));
        create.executeSql(String.format("INSERT INTO %s SELECT id, num, ts FROM %s WHERE id IN (%s)", "appendSink", "testData", hashSet.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(",")))).await();
        List<Row> selectAllTable = this.appendOutputTable.selectAllTable(getMetadata());
        Assertions.assertThat(selectAllTable.size()).isEqualTo(3);
        Map<Integer, Row> testDataMap = testDataMap();
        ListAssert assertThat = Assertions.assertThat(selectAllTable);
        Stream stream = hashSet.stream();
        testDataMap.getClass();
        assertThat.containsExactlyInAnyOrderElementsOf((Iterable) stream.map((v1) -> {
            return r2.get(v1);
        }).map(row -> {
            return Row.of(new Object[]{row.getField(0), row.getField(1), row.getField(3)});
        }).collect(Collectors.toList()));
    }

    @Test
    void testBatchSink() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.executeSql(this.batchOutputTable.getCreateQueryForFlink(getMetadata(), "batchSink", Arrays.asList("'sink.buffer-flush.max-rows' = '2'", "'sink.buffer-flush.interval' = '300ms'", "'sink.max-retries' = '4'")));
        create.executeSql(String.format("INSERT INTO %s  SELECT user_name, score  FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))  AS UserCountTable(score, user_name) ", "batchSink")).await();
        Assertions.assertThat(this.batchOutputTable.selectAllTable(getMetadata())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"Bob", 1L}), Row.of(new Object[]{"Tom", 22L}), Row.of(new Object[]{"Kim", 42L}), Row.of(new Object[]{"Kim", 42L}), Row.of(new Object[]{"Bob", 1L})});
    }

    @Test
    void testReadingFromChangelogSource() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().build());
        create.executeSql(String.format("CREATE TABLE %s (   user_id STRING,   user_name STRING,   email STRING,   balance DECIMAL(18,2),   balance2 AS balance * 2 ) WITH (  'connector' = 'values',  'data-id' = '%s',  'changelog-mode' = 'I,UA,UB,D' )", "user_logs", TestValuesTableFactory.registerData(TestData.userChangelog())));
        create.executeSql(this.userOutputTable.getCreateQueryForFlink(getMetadata(), "user_sink", Arrays.asList("'sink.buffer-flush.max-rows' = '2'", "'sink.buffer-flush.interval' = '0'")));
        create.executeSql(String.format("INSERT INTO %s SELECT * FROM %s", "user_sink", "user_logs")).await();
        Assertions.assertThat(this.userOutputTable.selectAllTable(getMetadata())).containsExactlyInAnyOrderElementsOf(testUserData());
    }

    @Test
    void testFlushBufferWhenCheckpoint() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "jdbc");
        hashMap.put("url", getMetadata().getJdbcUrl());
        hashMap.put("username", getMetadata().getUsername());
        hashMap.put("password", getMetadata().getPassword());
        hashMap.put("table-name", this.checkpointOutputTable.getTableName());
        hashMap.put("sink.buffer-flush.interval", "0");
        GenericJdbcSinkFunction createSinkFunction = FactoryMocks.createTableSink(this.checkpointOutputTable.getTableResolvedSchema(), hashMap).getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSinkFunction();
        createSinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0));
        createSinkFunction.open(new Configuration());
        createSinkFunction.invoke(GenericRowData.of(new Object[]{1L}), SinkContextUtil.forTimestamp(1L));
        createSinkFunction.invoke(GenericRowData.of(new Object[]{2L}), SinkContextUtil.forTimestamp(1L));
        Assertions.assertThat(this.checkpointOutputTable.selectAllTable(getMetadata())).isEmpty();
        createSinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1L, 1L));
        Assertions.assertThat(this.checkpointOutputTable.selectAllTable(getMetadata())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1L}), Row.of(new Object[]{2L})});
        createSinkFunction.close();
    }
}
