package org.apache.flink.connector.jdbc.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.class */
public class JdbcDynamicTableFactoryTest {
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("aaa", DataTypes.INT().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), Collections.emptyList(), UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));

    @Test
    public void testJdbcCommonProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("driver", JdbcDynamicTableSourceITCase.DRIVER_CLASS);
        allOptions.put("username", "user");
        allOptions.put("password", "pass");
        allOptions.put("connection.max-retry-timeout", "120s");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").setDriverName(JdbcDynamicTableSourceITCase.DRIVER_CLASS).setUsername("user").setPassword("pass").setConnectionCheckTimeoutSeconds(120).build();
        Assert.assertEquals(new JdbcDynamicTableSource(build, JdbcReadOptions.builder().build(), JdbcLookupOptions.builder().setCacheMaxSize(-1L).setCacheExpireMs(10000L).setMaxRetryTimes(3).build(), SCHEMA.toPhysicalRowDataType()), createTableSource);
        Assert.assertEquals(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(1000L).withMaxRetries(3).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()), FactoryMocks.createTableSink(SCHEMA, allOptions));
    }

    @Test
    public void testJdbcReadProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.partition.column", "aaa");
        allOptions.put("scan.partition.lower-bound", "-10");
        allOptions.put("scan.partition.upper-bound", "100");
        allOptions.put("scan.partition.num", "10");
        allOptions.put("scan.fetch-size", "20");
        allOptions.put("scan.auto-commit", "false");
        Assert.assertEquals(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().setPartitionColumnName("aaa").setPartitionLowerBound(-10L).setPartitionUpperBound(100L).setNumPartitions(10).setFetchSize(20).setAutoCommit(false).build(), JdbcLookupOptions.builder().setCacheMaxSize(-1L).setCacheExpireMs(10000L).setMaxRetryTimes(3).build(), SCHEMA.toPhysicalRowDataType()), FactoryMocks.createTableSource(SCHEMA, allOptions));
    }

    @Test
    public void testJdbcLookupProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("lookup.cache.max-rows", "1000");
        allOptions.put("lookup.cache.ttl", "10s");
        allOptions.put("lookup.max-retries", "10");
        Assert.assertEquals(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().build(), JdbcLookupOptions.builder().setCacheMaxSize(1000L).setCacheExpireMs(10000L).setMaxRetryTimes(10).build(), SCHEMA.toPhysicalRowDataType()), FactoryMocks.createTableSource(SCHEMA, allOptions));
    }

    @Test
    public void testJdbcSinkProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("sink.buffer-flush.max-rows", "1000");
        allOptions.put("sink.buffer-flush.interval", "2min");
        allOptions.put("sink.max-retries", "5");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build();
        Assert.assertEquals(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(120000L).withMaxRetries(5).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()), createTableSink);
    }

    @Test
    public void testJDBCSinkWithParallelism() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("sink.parallelism", "2");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, allOptions);
        JdbcConnectorOptions build = JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").setParallelism(2).build();
        Assert.assertEquals(new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(1000L).withMaxRetries(3).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames((String[]) SCHEMA.getColumnNames().toArray(new String[0])).withKeyFields("bbb", new String[]{"aaa"}).build(), SCHEMA.toPhysicalRowDataType()), createTableSink);
    }

    @Test
    public void testJdbcValidation() {
        try {
            Map<String, String> allOptions = getAllOptions();
            allOptions.put("password", "pass");
            FactoryMocks.createTableSource(SCHEMA, allOptions);
            Assert.fail("exception expected");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Either all or none of the following options should be provided:\nusername\npassword").isPresent());
        }
        try {
            Map<String, String> allOptions2 = getAllOptions();
            allOptions2.put("scan.partition.column", "aaa");
            allOptions2.put("scan.partition.lower-bound", "-10");
            allOptions2.put("scan.partition.upper-bound", "100");
            FactoryMocks.createTableSource(SCHEMA, allOptions2);
            Assert.fail("exception expected");
        } catch (Throwable th2) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th2, "Either all or none of the following options should be provided:\nscan.partition.column\nscan.partition.num\nscan.partition.lower-bound\nscan.partition.upper-bound").isPresent());
        }
        try {
            Map<String, String> allOptions3 = getAllOptions();
            allOptions3.put("scan.partition.column", "aaa");
            allOptions3.put("scan.partition.lower-bound", "100");
            allOptions3.put("scan.partition.upper-bound", "-10");
            allOptions3.put("scan.partition.num", "10");
            FactoryMocks.createTableSource(SCHEMA, allOptions3);
            Assert.fail("exception expected");
        } catch (Throwable th3) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th3, "'scan.partition.lower-bound'='100' must not be larger than 'scan.partition.upper-bound'='-10'.").isPresent());
        }
        try {
            Map<String, String> allOptions4 = getAllOptions();
            allOptions4.put("lookup.cache.max-rows", "10");
            FactoryMocks.createTableSource(SCHEMA, allOptions4);
            Assert.fail("exception expected");
        } catch (Throwable th4) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th4, "Either all or none of the following options should be provided:\nlookup.cache.max-rows\nlookup.cache.ttl").isPresent());
        }
        try {
            Map<String, String> allOptions5 = getAllOptions();
            allOptions5.put("lookup.cache.ttl", "1s");
            FactoryMocks.createTableSource(SCHEMA, allOptions5);
            Assert.fail("exception expected");
        } catch (Throwable th5) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th5, "Either all or none of the following options should be provided:\nlookup.cache.max-rows\nlookup.cache.ttl").isPresent());
        }
        try {
            Map<String, String> allOptions6 = getAllOptions();
            allOptions6.put("lookup.max-retries", "-1");
            FactoryMocks.createTableSource(SCHEMA, allOptions6);
            Assert.fail("exception expected");
        } catch (Throwable th6) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th6, "The value of 'lookup.max-retries' option shouldn't be negative, but is -1.").isPresent());
        }
        try {
            Map<String, String> allOptions7 = getAllOptions();
            allOptions7.put("sink.max-retries", "-1");
            FactoryMocks.createTableSource(SCHEMA, allOptions7);
            Assert.fail("exception expected");
        } catch (Throwable th7) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th7, "The value of 'sink.max-retries' option shouldn't be negative, but is -1.").isPresent());
        }
        try {
            Map<String, String> allOptions8 = getAllOptions();
            allOptions8.put("connection.max-retry-timeout", "100ms");
            FactoryMocks.createTableSource(SCHEMA, allOptions8);
            Assert.fail("exception expected");
        } catch (Throwable th8) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th8, "The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms.").isPresent());
        }
    }

    @Test
    public void testJdbcLookupPropertiesWithExcludeEmptyResult() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("lookup.cache.max-rows", "1000");
        allOptions.put("lookup.cache.ttl", "10s");
        allOptions.put("lookup.max-retries", "10");
        allOptions.put("lookup.cache.caching-missing-key", "true");
        Assert.assertEquals(new JdbcDynamicTableSource(JdbcConnectorOptions.builder().setDBUrl("jdbc:derby:memory:mydb").setTableName("mytable").build(), JdbcReadOptions.builder().build(), JdbcLookupOptions.builder().setCacheMaxSize(1000L).setCacheExpireMs(10000L).setMaxRetryTimes(10).setCacheMissingKey(true).build(), SCHEMA.toPhysicalRowDataType()), FactoryMocks.createTableSource(SCHEMA, allOptions));
    }

    private Map<String, String> getAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "jdbc");
        hashMap.put("url", "jdbc:derby:memory:mydb");
        hashMap.put("table-name", "mytable");
        return hashMap;
    }
}
