package org.apache.flink.connector.mongodb.table;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.class */
class MongoDynamicTableFactoryTest {
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("aaa", DataTypes.INT().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), Collections.emptyList(), UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));

    MongoDynamicTableFactoryTest() {
    }

    @Test
    void testMongoSourceCommonProperties() {
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, getRequiredOptions());
        Assertions.assertThat(createTableSource).isEqualTo(new MongoDynamicTableSource(getConnectionOptions(), MongoReadOptions.builder().build(), (LookupCache) null, ((Integer) LookupOptions.MAX_RETRIES.defaultValue()).intValue(), ((Duration) MongoConnectorOptions.LOOKUP_RETRY_INTERVAL.defaultValue()).toMillis(), (FilterHandlingPolicy) MongoConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    void testMongoSinkCommonProperties() {
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, getRequiredOptions());
        Assertions.assertThat(createTableSink).isEqualTo(new MongoDynamicTableSink(getConnectionOptions(), MongoWriteOptions.builder().build(), (Integer) null, SCHEMA, new String[0]));
    }

    @Test
    void testMongoReadProperties() {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put(MongoConnectorOptions.SCAN_FETCH_SIZE.key(), "1024");
        requiredOptions.put(MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT.key(), "false");
        requiredOptions.put(MongoConnectorOptions.SCAN_PARTITION_STRATEGY.key(), "split-vector");
        requiredOptions.put(MongoConnectorOptions.SCAN_PARTITION_SIZE.key(), "128m");
        requiredOptions.put(MongoConnectorOptions.SCAN_PARTITION_SAMPLES.key(), "5");
        requiredOptions.put(MongoConnectorOptions.FILTER_HANDLING_POLICY.key(), "never");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, requiredOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new MongoDynamicTableSource(getConnectionOptions(), MongoReadOptions.builder().setFetchSize(1024).setNoCursorTimeout(false).setPartitionStrategy(PartitionStrategy.SPLIT_VECTOR).setPartitionSize(MemorySize.ofMebiBytes(128L)).setSamplesPerPartition(5).build(), (LookupCache) null, ((Integer) LookupOptions.MAX_RETRIES.defaultValue()).intValue(), ((Duration) MongoConnectorOptions.LOOKUP_RETRY_INTERVAL.defaultValue()).toMillis(), FilterHandlingPolicy.NEVER, SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    void testMongoPaginationPartitionProperties() {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put(MongoConnectorOptions.SCAN_PARTITION_STRATEGY.key(), "pagination");
        requiredOptions.put(MongoConnectorOptions.SCAN_PARTITION_RECORD_SIZE.key(), "42");
        requiredOptions.put(MongoConnectorOptions.FILTER_HANDLING_POLICY.key(), "never");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, requiredOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new MongoDynamicTableSource(getConnectionOptions(), MongoReadOptions.builder().setPartitionStrategy(PartitionStrategy.PAGINATION).setPartitionRecordSize(42).build(), (LookupCache) null, ((Integer) LookupOptions.MAX_RETRIES.defaultValue()).intValue(), ((Duration) MongoConnectorOptions.LOOKUP_RETRY_INTERVAL.defaultValue()).toMillis(), FilterHandlingPolicy.NEVER, SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    void testMongoLookupProperties() {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL");
        requiredOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "10s");
        requiredOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "20s");
        requiredOptions.put(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY.key(), "false");
        requiredOptions.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "15213");
        requiredOptions.put(LookupOptions.MAX_RETRIES.key(), "10");
        requiredOptions.put(MongoConnectorOptions.LOOKUP_RETRY_INTERVAL.key(), "20ms");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(SCHEMA, requiredOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new MongoDynamicTableSource(getConnectionOptions(), MongoReadOptions.builder().build(), DefaultLookupCache.fromConfig(Configuration.fromMap(requiredOptions)), 10, 20L, (FilterHandlingPolicy) MongoConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(), SCHEMA.toPhysicalRowDataType()));
    }

    @Test
    void testMongoSinkProperties() {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put(MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS.key(), "1001");
        requiredOptions.put(MongoConnectorOptions.BUFFER_FLUSH_INTERVAL.key(), "2min");
        requiredOptions.put(MongoConnectorOptions.DELIVERY_GUARANTEE.key(), "at-least-once");
        requiredOptions.put(MongoConnectorOptions.SINK_MAX_RETRIES.key(), "5");
        requiredOptions.put(MongoConnectorOptions.SINK_RETRY_INTERVAL.key(), "2s");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, requiredOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new MongoDynamicTableSink(getConnectionOptions(), MongoWriteOptions.builder().setBatchSize(1001).setBatchIntervalMs(TimeUnit.MINUTES.toMillis(2L)).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setMaxRetries(5).setRetryIntervalMs(TimeUnit.SECONDS.toMillis(2L)).build(), (Integer) null, SCHEMA, new String[0]));
    }

    @Test
    void testMongoSinkWithParallelism() {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put("sink.parallelism", "2");
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(SCHEMA, requiredOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new MongoDynamicTableSink(getConnectionOptions(), MongoWriteOptions.builder().build(), 2, SCHEMA, new String[0]));
    }

    @Test
    void testMongoValidation() {
        assertSourceValidationRejects(MongoConnectorOptions.SCAN_FETCH_SIZE.key(), "0", "The fetch size must be larger than 0.");
        assertSourceValidationRejects(MongoConnectorOptions.SCAN_PARTITION_SIZE.key(), "900kb", "The partition size must be larger than or equal to 1mb.");
        assertSourceValidationRejects(MongoConnectorOptions.SCAN_PARTITION_SAMPLES.key(), "0", "The samples per partition must be larger than 0.");
        assertSourceValidationRejects(LookupOptions.MAX_RETRIES.key(), "-1", "The 'lookup.max-retries' must be larger than or equal to 0.");
        assertSourceValidationRejects(MongoConnectorOptions.LOOKUP_RETRY_INTERVAL.key(), "0ms", "The 'lookup.retry.interval' must be larger than 0.");
        assertSourceValidationRejects(MongoConnectorOptions.SCAN_PARTITION_RECORD_SIZE.key(), "0", "The record size per partition must be larger than 0.");
        assertSinkValidationRejects(MongoConnectorOptions.SINK_MAX_RETRIES.key(), "-1", "The sink max retry times must be larger than or equal to 0.");
        assertSinkValidationRejects(MongoConnectorOptions.SINK_RETRY_INTERVAL.key(), "0ms", "The retry interval (in milliseconds) must be larger than 0.");
        assertSinkValidationRejects(MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS.key(), "0", "Max number of batch size must be larger than 0.");
        assertSinkValidationRejects(MongoConnectorOptions.DELIVERY_GUARANTEE.key(), "exactly-once", "Mongo sink does not support the EXACTLY_ONCE guarantee.");
    }

    private void assertSourceValidationRejects(String str, String str2, String str3) {
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(SCHEMA, getRequiredOptionsWithSetting(str, str2));
        }).hasStackTraceContaining(str3);
    }

    private void assertSinkValidationRejects(String str, String str2, String str3) {
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSink(SCHEMA, getRequiredOptionsWithSetting(str, str2));
        }).hasStackTraceContaining(str3);
    }

    private static Map<String, String> getRequiredOptionsWithSetting(String str, String str2) {
        Map<String, String> requiredOptions = getRequiredOptions();
        requiredOptions.put(str, str2);
        return requiredOptions;
    }

    private static Map<String, String> getRequiredOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(FactoryUtil.CONNECTOR.key(), MongoTestUtil.MONGODB_HOSTNAME);
        hashMap.put(MongoConnectorOptions.URI.key(), "mongodb://127.0.0.1:27017");
        hashMap.put(MongoConnectorOptions.DATABASE.key(), "test_db");
        hashMap.put(MongoConnectorOptions.COLLECTION.key(), "test_coll");
        return hashMap;
    }

    private static MongoConnectionOptions getConnectionOptions() {
        return MongoConnectionOptions.builder().setUri("mongodb://127.0.0.1:27017").setDatabase("test_db").setCollection("test_coll").build();
    }
}
