package org.apache.flink.connector.jdbc.table;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.class */
public class JdbcDynamicTableFactoryTest {
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("aaa", DataTypes.INT().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), Collections.emptyList(), UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));

    @Test
    public void testJdbcCommonProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("driver", JdbcDynamicTableSourceITCase.DRIVER_CLASS);
        allOptions.put("username", "user");
        allOptions.put("password", "pass");
        allOptions.put("connection.max-retry-timeout", "120s");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").setDriverName(JdbcDynamicTableSourceITCase.DRIVER_CLASS).setUsername("user").setPassword("pass").setConnectionCheckTimeoutSeconds(120).build();
        Assertions.assertThat(createTableSource).isEqualTo(new JdbcDynamicTableSource(build, JdbcReadOptions.builder().build(), ((Integer) LookupOptions.MAX_RETRIES.defaultValue()).intValue(), (LookupCache) null, SCHEMA.toPhysicalRowDataType()));
        Assertions.assertThat(FactoryMocks.createTableSink(SCHEMA, allOptions)).isEqualTo(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(1000L).withMaxRetries(3).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJdbcReadProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.partition.column", "aaa");
        allOptions.put("scan.partition.lower-bound", "-10");
        allOptions.put("scan.partition.upper-bound", "100");
        allOptions.put("scan.partition.num", "10");
        allOptions.put("scan.fetch-size", "20");
        allOptions.put("scan.auto-commit", "false");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().setPartitionColumnName("aaa").setPartitionLowerBound(-10L).setPartitionUpperBound(100L).setNumPartitions(10).setFetchSize(20).setAutoCommit(false).build(), ((Integer) LookupOptions.MAX_RETRIES.defaultValue()).intValue(), (LookupCache) null, SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJdbcLookupProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("lookup.cache", "PARTIAL");
        allOptions.put("lookup.partial-cache.expire-after-write", "10s");
        allOptions.put("lookup.partial-cache.expire-after-access", "20s");
        allOptions.put("lookup.partial-cache.cache-missing-key", "false");
        allOptions.put("lookup.partial-cache.max-rows", "15213");
        allOptions.put("lookup.max-retries", "10");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().build(), 10, DefaultLookupCache.fromConfig(Configuration.fromMap(allOptions)), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJdbcLookupPropertiesWithLegacyOptions() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("lookup.cache.max-rows", "1000");
        allOptions.put("lookup.cache.ttl", "10s");
        allOptions.put("lookup.max-retries", "10");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().build(), 10, DefaultLookupCache.newBuilder().maximumSize(1000L).expireAfterWrite(Duration.ofSeconds(10L)).build(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJdbcSinkProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("sink.buffer-flush.max-rows", "1000");
        allOptions.put("sink.buffer-flush.interval", "2min");
        allOptions.put("sink.max-retries", "5");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build();
        Assertions.assertThat(createTableSink).isEqualTo(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(120000L).withMaxRetries(5).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJDBCSinkWithParallelism() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("sink.parallelism", "2");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").setParallelism(2).build();
        Assertions.assertThat(createTableSink).isEqualTo(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(1000L).withMaxRetries(3).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    public void testJdbcValidation() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("password", "pass");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions);
        }).hasStackTraceContaining("Either all or none of the following options should be provided:\nusername\npassword");
        Map<String, String> allOptions2 = getAllOptions();
        allOptions2.put("scan.partition.column", "aaa");
        allOptions2.put("scan.partition.lower-bound", "-10");
        allOptions2.put("scan.partition.upper-bound", "100");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions2);
        }).hasStackTraceContaining("Either all or none of the following options should be provided:\nscan.partition.column\nscan.partition.num\nscan.partition.lower-bound\nscan.partition.upper-bound");
        Map<String, String> allOptions3 = getAllOptions();
        allOptions3.put("scan.partition.column", "aaa");
        allOptions3.put("scan.partition.lower-bound", "100");
        allOptions3.put("scan.partition.upper-bound", "-10");
        allOptions3.put("scan.partition.num", "10");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions3);
        }).hasStackTraceContaining("'scan.partition.lower-bound'='100' must not be larger than 'scan.partition.upper-bound'='-10'.");
        Map<String, String> allOptions4 = getAllOptions();
        allOptions4.put("lookup.cache.max-rows", "10");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions4);
        }).hasStackTraceContaining("Either all or none of the following options should be provided:\nlookup.cache.max-rows\nlookup.cache.ttl");
        Map<String, String> allOptions5 = getAllOptions();
        allOptions5.put("lookup.cache.ttl", "1s");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions5);
        }).hasStackTraceContaining("Either all or none of the following options should be provided:\nlookup.cache.max-rows\nlookup.cache.ttl");
        Map<String, String> allOptions6 = getAllOptions();
        allOptions6.put("lookup.max-retries", "-1");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions6);
        }).hasStackTraceContaining("The value of 'lookup.max-retries' option shouldn't be negative, but is -1.");
        Map<String, String> allOptions7 = getAllOptions();
        allOptions7.put("sink.max-retries", "-1");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions7);
        }).hasStackTraceContaining("The value of 'sink.max-retries' option shouldn't be negative, but is -1.");
        Map<String, String> allOptions8 = getAllOptions();
        allOptions8.put("connection.max-retry-timeout", "100ms");
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, allOptions8);
        }).hasStackTraceContaining("The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms.");
    }

    @Test
    public void testJdbcLookupPropertiesWithExcludeEmptyResult() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("lookup.cache.max-rows", "1000");
        allOptions.put("lookup.cache.ttl", "10s");
        allOptions.put("lookup.max-retries", "10");
        allOptions.put("lookup.cache.caching-missing-key", "true");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().build(), 10, DefaultLookupCache.newBuilder().maximumSize(1000L).expireAfterWrite(Duration.ofSeconds(10L)).build(), SCHEMA.toPhysicalRowDataType()));
    }

    private Map<String, String> getAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "jdbc");
        hashMap.put("url", "jdbc:derby:memory:mydb");
        hashMap.put("table-name", "mytable");
        return hashMap;
    }
}
