package org.apache.flink.cdc.connectors.oracle.table;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.class */
public class OracleTableSourceFactoryTest {
    private static final int MY_PORT = 1521;
    private static final String MY_LOCALHOST = "localhost";
    private static final String MY_USERNAME = "flinkuser";
    private static final String MY_PASSWORD = "flinkpw";
    private static final String MY_DATABASE = "MYDB";
    private static final String MY_TABLE = "myTable";
    private static final String MY_SCHEMA = "mySchema";
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("aaa", DataTypes.INT().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), new ArrayList(), UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
    private static final ResolvedSchema SCHEMA_WITH_METADATA = new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.BIGINT().notNull()), Column.physical("name", DataTypes.STRING()), Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata("database_name", DataTypes.STRING(), "database_name", true), Column.metadata("table_name", DataTypes.STRING(), "table_name", true), Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
    private static final Properties PROPERTIES = new Properties();

    @Test
    public void testRequiredProperties() {
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, PROPERTIES, StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(getAllRequiredOptions()));
    }

    @Test
    public void testCommonProperties() {
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, PROPERTIES, StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(getAllRequiredOptionsWithHost()));
    }

    @Test
    public void testOptionalProperties() {
        Map<String, String> allRequiredOptions = getAllRequiredOptions();
        allRequiredOptions.put("port", "1521");
        allRequiredOptions.put("hostname", MY_LOCALHOST);
        allRequiredOptions.put("debezium.snapshot.mode", "initial");
        DynamicTableSource createTableSource = createTableSource(allRequiredOptions);
        Properties properties = new Properties();
        properties.put("snapshot.mode", "initial");
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, properties, StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource);
    }

    @Test
    public void testScanIncrementalProperties() {
        Map<String, String> allRequiredOptions = getAllRequiredOptions();
        Duration ofSeconds = Duration.ofSeconds(60L);
        allRequiredOptions.put("port", "1521");
        allRequiredOptions.put("hostname", MY_LOCALHOST);
        allRequiredOptions.put("debezium.snapshot.mode", "initial");
        allRequiredOptions.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
        allRequiredOptions.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.key(), String.valueOf(4096));
        allRequiredOptions.put(SourceOptions.CHUNK_META_GROUP_SIZE.key(), String.valueOf(2048));
        allRequiredOptions.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(1024));
        allRequiredOptions.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true");
        allRequiredOptions.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(), "true");
        allRequiredOptions.put(JdbcSourceOptions.CONNECT_TIMEOUT.key(), String.format("%ds", Long.valueOf(ofSeconds.getSeconds())));
        allRequiredOptions.put(JdbcSourceOptions.CONNECT_MAX_RETRIES.key(), String.valueOf(5));
        allRequiredOptions.put(JdbcSourceOptions.CONNECTION_POOL_SIZE.key(), String.valueOf(10));
        allRequiredOptions.put(JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), String.valueOf(40.5d));
        allRequiredOptions.put(JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), String.valueOf(0.01d));
        DynamicTableSource createTableSource = createTableSource(allRequiredOptions);
        Properties properties = new Properties();
        properties.put("snapshot.mode", "initial");
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, properties, StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), 4096, 2048, 1024, ofSeconds, 5, 10, 40.5d, 0.01d, (String) null, true, true), createTableSource);
    }

    @Test
    public void testStartupFromInitial() {
        Map<String, String> allRequiredOptionsWithHost = getAllRequiredOptionsWithHost();
        allRequiredOptionsWithHost.put("scan.startup.mode", "initial");
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, PROPERTIES, StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allRequiredOptionsWithHost));
    }

    @Test
    public void testStartupFromLatestOffset() {
        Map<String, String> allRequiredOptionsWithHost = getAllRequiredOptionsWithHost();
        allRequiredOptionsWithHost.put("scan.startup.mode", "latest-offset");
        Assert.assertEquals(new OracleTableSource(SCHEMA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, PROPERTIES, StartupOptions.latest(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allRequiredOptionsWithHost));
    }

    @Test
    public void testMetadataColumns() {
        OracleTableSource createTableSource = createTableSource(SCHEMA_WITH_METADATA, getAllRequiredOptions());
        createTableSource.applyReadableMetadata(Arrays.asList("op_ts", "database_name", "table_name", "schema_name"), SCHEMA_WITH_METADATA.toSourceRowDataType());
        DynamicTableSource copy = createTableSource.copy();
        OracleTableSource oracleTableSource = new OracleTableSource(SCHEMA_WITH_METADATA, (String) null, MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, new Properties(), StartupOptions.initial(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), (String) null, ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue(), ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue());
        oracleTableSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
        oracleTableSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name", "schema_name");
        Assert.assertEquals(oracleTableSource, copy);
    }

    @Test
    public void testValidation() {
        try {
            Map<String, String> allRequiredOptionsWithHost = getAllRequiredOptionsWithHost();
            allRequiredOptionsWithHost.put("port", "123b");
            createTableSource(allRequiredOptionsWithHost);
            Assert.fail("exception expected");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Could not parse value '123b' for key 'port'.").isPresent());
        }
        for (ConfigOption configOption : new OracleTableSourceFactory().requiredOptions()) {
            Map<String, String> allRequiredOptionsWithHost2 = getAllRequiredOptionsWithHost();
            allRequiredOptionsWithHost2.remove(configOption.key());
            try {
                createTableSource(allRequiredOptionsWithHost2);
                Assert.fail("exception expected");
            } catch (Throwable th2) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th2, "Missing required options are:\n\n" + configOption.key()).isPresent());
            }
        }
        try {
            Map<String, String> allRequiredOptionsWithHost3 = getAllRequiredOptionsWithHost();
            allRequiredOptionsWithHost3.put("unknown", "abc");
            createTableSource(allRequiredOptionsWithHost3);
            Assert.fail("exception expected");
        } catch (Throwable th3) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th3, "Unsupported options:\n\nunknown").isPresent());
        }
        try {
            Map<String, String> allRequiredOptionsWithHost4 = getAllRequiredOptionsWithHost();
            allRequiredOptionsWithHost4.put("scan.startup.mode", "abc");
            createTableSource(allRequiredOptionsWithHost4);
            Assert.fail("exception expected");
        } catch (Throwable th4) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th4, "Invalid value for option 'scan.startup.mode'. Supported values are [initial, snapshot, latest-offset], but was: abc").isPresent());
        }
    }

    private Map<String, String> getAllRequiredOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "oracle-cdc");
        hashMap.put("port", "1521");
        hashMap.put("hostname", MY_LOCALHOST);
        hashMap.put("database-name", MY_DATABASE);
        hashMap.put("table-name", MY_TABLE);
        hashMap.put("username", MY_USERNAME);
        hashMap.put("password", MY_PASSWORD);
        hashMap.put("schema-name", MY_SCHEMA);
        return hashMap;
    }

    private Map<String, String> getAllRequiredOptionsWithHost() {
        Map<String, String> allRequiredOptions = getAllRequiredOptions();
        allRequiredOptions.put("hostname", MY_LOCALHOST);
        return allRequiredOptions;
    }

    private static DynamicTableSource createTableSource(Map<String, String> map) {
        return createTableSource(SCHEMA, map);
    }

    private static DynamicTableSource createTableSource(ResolvedSchema resolvedSchema, Map<String, String> map) {
        return FactoryUtil.createTableSource((Catalog) null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "mock source", new ArrayList(), map), resolvedSchema), new Configuration(), OracleTableSourceFactoryTest.class.getClassLoader(), false);
    }
}
