package org.apache.flink.table.factories;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.TestManagedTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestCatalogFactory;
import org.apache.flink.table.factories.TestCatalogStoreFactory;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.MutableURLClassLoader;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/factories/FactoryUtilTest.class */
class FactoryUtilTest {

    /* loaded from: input_file:org/apache/flink/table/factories/FactoryUtilTest$TestFactoryWithMap.class */
    private static class TestFactoryWithMap implements DynamicTableFactory {
        public static final ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key("properties").mapType().noDefaultValue();

        private TestFactoryWithMap() {
        }

        public String factoryIdentifier() {
            return "test-factory-with-map";
        }

        public Set<ConfigOption<?>> requiredOptions() {
            return Collections.emptySet();
        }

        public Set<ConfigOption<?>> optionalOptions() {
            return Collections.singleton(PROPERTIES);
        }
    }

    FactoryUtilTest() {
    }

    @Test
    void testManagedConnector() {
        Map<String, String> createAllOptions = createAllOptions();
        createAllOptions.remove("connector");
        Assertions.assertThat(FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions)).isExactlyInstanceOf(TestManagedTableSource.class);
    }

    @Test
    void testInvalidConnector() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Could not find any factory for identifier 'FAIL' that implements '" + DynamicTableFactory.class.getName() + "' in the classpath.\n\nAvailable factory identifiers are:\n\nconflicting\nsink-only\nsource-only\ntest\ntest-connector");
    }

    @Test
    void testConflictingConnector() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Multiple factories for identifier 'conflicting' that implement '" + DynamicTableFactory.class.getName() + "' found in the classpath.\n\nAmbiguous factory classes are:\n\n" + TestConflictingDynamicTableFactory1.class.getName() + "\n" + TestConflictingDynamicTableFactory2.class.getName());
    }

    @Test
    void testMissingConnectorOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "One or more required options are missing.\n\nMissing required options are:\n\ntarget");
    }

    @Test
    void testInvalidConnectorOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Invalid value for option 'buffer-size'.");
    }

    @Test
    void testMissingFormat() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Could not find required scan format 'value.format'.");
    }

    @Test
    void testInvalidFormat() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Could not find any factory for identifier 'FAIL' that implements '" + DeserializationFormatFactory.class.getName() + "' in the classpath.\n\nAvailable factory identifiers are:\n\ntest-format");
    }

    @Test
    void testMissingFormatOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "One or more required options are missing.\n\nMissing required options are:\n\ndelimiter", "Error creating scan format 'test-format' in option space 'key.test-format.'.");
    }

    @Test
    void testInvalidFormatOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
        }, "Invalid value for option 'fail-on-missing'.");
    }

    @Test
    void testSecretOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
            map.remove("target");
            map.put("password", "123");
        }, "Table options are:\n\n'buffer-size'='1000'\n'connector'='test-connector'\n'key.format'='test-format'\n'key.test-format.delimiter'=','\n'password'='******'\n'property-version'='1'\n'value.format'='test-format'\n'value.test-format.delimiter'='|'\n'value.test-format.fail-on-missing'='true'");
    }

    @Test
    void testUnconsumedOption() {
        assertCreateTableSourceWithOptionModifier(map -> {
            map.put("this-is-not-consumed", "42");
            map.put("this-is-also-not-consumed", "true");
        }, "Unsupported options found for 'test-connector'.\n\nUnsupported options:\n\nthis-is-also-not-consumed\nthis-is-not-consumed\n\nSupported options:\n\nbuffer-size\nconnector\ndeprecated-target (deprecated)\nfallback-buffer-size\nformat\nkey.format\nkey.test-format.changelog-mode\nkey.test-format.delimiter\nkey.test-format.deprecated-delimiter (deprecated)\nkey.test-format.fail-on-missing\nkey.test-format.fallback-fail-on-missing\nkey.test-format.readable-metadata\npassword\nproperty-version\nscan.watermark.alignment.group\nscan.watermark.alignment.max-drift\nscan.watermark.alignment.update-interval\nscan.watermark.emit.strategy\nscan.watermark.idle-timeout\ntarget\nvalue.format\nvalue.test-format.changelog-mode\nvalue.test-format.delimiter\nvalue.test-format.deprecated-delimiter (deprecated)\nvalue.test-format.fail-on-missing\nvalue.test-format.fallback-fail-on-missing\nvalue.test-format.readable-metadata");
    }

    @Test
    void testWatermarkEmitOptions() {
        Map<String, String> createWatermarkOptions = createWatermarkOptions();
        assertCreateTableSourceWithOptionModifier(map -> {
            map.putAll(createWatermarkOptions);
            map.put(FactoryUtil.WATERMARK_EMIT_STRATEGY.key(), "test_strategy");
        }, "Invalid value for option 'scan.watermark.emit.strategy'.");
    }

    @Test
    void testWatermarkAlignmentOptions() {
        Map<String, String> createWatermarkOptions = createWatermarkOptions();
        assertCreateTableSourceWithOptionModifier(map -> {
            map.putAll(createWatermarkOptions);
            map.remove(FactoryUtil.WATERMARK_ALIGNMENT_GROUP.key());
        }, "Error configuring watermark for 'test-connector', 'scan.watermark.alignment.group' and 'scan.watermark.alignment.max-drift' must be set when configuring watermark alignment");
        assertCreateTableSourceWithOptionModifier(map2 -> {
            map2.putAll(createWatermarkOptions);
            map2.remove(FactoryUtil.WATERMARK_ALIGNMENT_MAX_DRIFT.key());
        }, "Error configuring watermark for 'test-connector', 'scan.watermark.alignment.group' and 'scan.watermark.alignment.max-drift' must be set when configuring watermark alignment");
    }

    @Test
    void testAllOptions() {
        Map<String, String> createAllOptions = createAllOptions();
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock("|", true)));
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock("|")));
    }

    @Test
    void testDiscoveryForSeparateSourceSinkFactory() {
        Map<String, String> createAllOptions = createAllOptions();
        createAllOptions.put("connector", "test");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock("|", true)));
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock("|")));
    }

    @Test
    void testOptionalFormat() {
        Map<String, String> createAllOptions = createAllOptions();
        createAllOptions.remove("key.format");
        createAllOptions.remove("key.test-format.delimiter");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, null, new TestFormatFactory.DecodingFormatMock("|", true)));
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, null, new TestFormatFactory.EncodingFormatMock("|")));
    }

    @Test
    void testAlternativeValueFormat() {
        Map<String, String> createAllOptions = createAllOptions();
        createAllOptions.remove("value.format");
        createAllOptions.remove("value.test-format.delimiter");
        createAllOptions.remove("value.test-format.fail-on-missing");
        createAllOptions.put("format", TestFormatFactory.IDENTIFIER);
        createAllOptions.put("test-format.delimiter", ";");
        createAllOptions.put("test-format.fail-on-missing", "true");
        DynamicTableSource createTableSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSource).isEqualTo(new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock(";", true)));
        DynamicTableSink createTableSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, createAllOptions);
        Assertions.assertThat(createTableSink).isEqualTo(new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock(";")));
    }

    @Test
    void testConnectorErrorHint() {
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSource(FactoryMocks.SCHEMA, Collections.singletonMap("connector", TestDynamicTableSinkOnlyFactory.IDENTIFIER));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, "Connector 'sink-only' can only be used as a sink. It cannot be used as a source.")});
        Assertions.assertThatThrownBy(() -> {
            FactoryMocks.createTableSink(FactoryMocks.SCHEMA, Collections.singletonMap("connector", TestDynamicTableSourceOnlyFactory.IDENTIFIER));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, "Connector 'source-only' can only be used as a source. It cannot be used as a sink.")});
    }

    @Test
    void testRequiredPlaceholderOption() {
        HashSet hashSet = new HashSet();
        hashSet.add(ConfigOptions.key("fields.#.min").intType().noDefaultValue());
        hashSet.add(ConfigOptions.key("no.placeholder.anymore").intType().noDefaultValue().withFallbackKeys(new String[]{"old.fields.#.min"}));
        FactoryUtil.validateFactoryOptions(hashSet, new HashSet(), new Configuration());
    }

    @Test
    void testCreateCatalog() {
        HashMap hashMap = new HashMap();
        hashMap.put(CommonCatalogOptions.CATALOG_TYPE.key(), "test-catalog");
        hashMap.put(TestCatalogFactory.DEFAULT_DATABASE.key(), "my-database");
        Catalog createCatalog = FactoryUtil.createCatalog("my-catalog", hashMap, (ReadableConfig) null, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(createCatalog).isInstanceOf(TestCatalogFactory.TestCatalog.class);
        TestCatalogFactory.TestCatalog testCatalog = (TestCatalogFactory.TestCatalog) createCatalog;
        Assertions.assertThat("my-catalog").isEqualTo(testCatalog.getName());
        Assertions.assertThat("my-database").isEqualTo(testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()));
    }

    @Test
    void testCatalogFactoryHelper() {
        FactoryUtil.createCatalogFactoryHelper(new TestCatalogFactory(), new FactoryUtil.DefaultCatalogContext("test", Collections.emptyMap(), (ReadableConfig) null, Thread.currentThread().getContextClassLoader())).validate();
        FactoryUtil.CatalogFactoryHelper createCatalogFactoryHelper = FactoryUtil.createCatalogFactoryHelper(new TestCatalogFactory(), new FactoryUtil.DefaultCatalogContext("test", Collections.singletonMap("x", "y"), (ReadableConfig) null, Thread.currentThread().getContextClassLoader()));
        createCatalogFactoryHelper.getClass();
        Assertions.assertThatThrownBy(createCatalogFactoryHelper::validate).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, "Unsupported options found for 'test-catalog'")});
    }

    @Test
    void testCreateCatalogStore() {
        HashMap hashMap = new HashMap();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        FactoryUtil.DefaultCatalogStoreContext defaultCatalogStoreContext = new FactoryUtil.DefaultCatalogStoreContext(hashMap, (ReadableConfig) null, contextClassLoader);
        CatalogStoreFactory discoverFactory = FactoryUtil.discoverFactory(contextClassLoader, CatalogStoreFactory.class, TestCatalogStoreFactory.IDENTIFIER);
        discoverFactory.open(defaultCatalogStoreContext);
        Assertions.assertThat(discoverFactory.createCatalogStore()).isInstanceOf(TestCatalogStoreFactory.TestCatalogStore.class);
    }

    @Test
    void testFactoryHelperWithDeprecatedOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("deprecated-target", "MyTarget");
        hashMap.put("fallback-buffer-size", "1000");
        hashMap.put("value.format", TestFormatFactory.IDENTIFIER);
        hashMap.put("value.test-format.deprecated-delimiter", "|");
        hashMap.put("value.test-format.fallback-fail-on-missing", "true");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap));
        createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.VALUE_FORMAT);
        createTableFactoryHelper.validate();
    }

    @Test
    void testFactoryHelperWithEnrichmentOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(TestDynamicTableFactory.TARGET.key(), "abc");
        hashMap.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        hashMap2.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap, hashMap2));
        createTableFactoryHelper.validate();
        Assertions.assertThat((String) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.TARGET)).isEqualTo("abc");
        Assertions.assertThat((Long) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE)).isEqualTo(2000L);
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsAndFormat() {
        String formatPrefix = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, TestFormatFactory.IDENTIFIER);
        String formatPrefix2 = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.VALUE_FORMAT, TestFormatFactory.IDENTIFIER);
        HashMap hashMap = new HashMap();
        hashMap.put(TestDynamicTableFactory.TARGET.key(), "abc");
        hashMap.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        hashMap.put(TestDynamicTableFactory.KEY_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap.put(formatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        hashMap.put(formatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        hashMap.put(TestDynamicTableFactory.VALUE_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap.put(formatPrefix2 + TestFormatFactory.DELIMITER.key(), "|");
        hashMap.put(formatPrefix2 + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        hashMap2.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        hashMap2.put(TestDynamicTableFactory.KEY_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap2.put(formatPrefix + TestFormatFactory.DELIMITER.key(), ",");
        hashMap2.put(formatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        hashMap2.put(TestDynamicTableFactory.VALUE_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap2.put(formatPrefix2 + TestFormatFactory.DELIMITER.key(), "|");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap, hashMap2));
        TestFormatFactory.DecodingFormatMock decodingFormatMock = (TestFormatFactory.DecodingFormatMock) createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT);
        TestFormatFactory.DecodingFormatMock decodingFormatMock2 = (TestFormatFactory.DecodingFormatMock) createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.VALUE_FORMAT);
        createTableFactoryHelper.validate();
        Assertions.assertThat((String) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.TARGET)).isEqualTo("abc");
        Assertions.assertThat((Long) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE)).isEqualTo(2000L);
        Assertions.assertThat(decodingFormatMock.delimiter).isEqualTo(",");
        Assertions.assertThat(decodingFormatMock.failOnMissing).isTrue();
        Assertions.assertThat(decodingFormatMock2.delimiter).isEqualTo("|");
        Assertions.assertThat(decodingFormatMock2.failOnMissing).isTrue();
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsMissingFormatIdentifier() {
        HashMap hashMap = new HashMap();
        hashMap.put(TestDynamicTableFactory.TARGET.key(), "abc");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        hashMap2.put(TestDynamicTableFactory.KEY_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap2.put(FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, TestFormatFactory.IDENTIFIER) + TestFormatFactory.DELIMITER.key(), ",");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap, hashMap2));
        Assertions.assertThatThrownBy(() -> {
            createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT);
        }).isInstanceOf(ValidationException.class).hasMessageContaining(String.format("The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'", TestDynamicTableFactory.KEY_FORMAT, TestFormatFactory.IDENTIFIER));
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsFormatMismatch() {
        String formatPrefix = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, TestFormatFactory.IDENTIFIER);
        HashMap hashMap = new HashMap();
        hashMap.put(TestDynamicTableFactory.TARGET.key(), "abc");
        hashMap.put(TestDynamicTableFactory.KEY_FORMAT.key(), TestFormatFactory.IDENTIFIER);
        hashMap.put(formatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        hashMap.put(formatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        hashMap2.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        hashMap2.put(TestDynamicTableFactory.KEY_FORMAT.key(), "another-format");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap, hashMap2));
        Assertions.assertThatThrownBy(() -> {
            createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT);
        }).isInstanceOf(ValidationException.class).hasMessageContaining(String.format("Both persisted plan table and catalog table define the format option '%s', but they mismatch: '%s' != '%s'", TestDynamicTableFactory.KEY_FORMAT, TestFormatFactory.IDENTIFIER, "another-format"));
    }

    @Test
    void testFactoryHelperWithEmptyEnrichmentOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(TestDynamicTableFactory.TARGET.key(), "abc");
        hashMap.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestDynamicTableFactory(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap));
        createTableFactoryHelper.validate();
        Assertions.assertThat((String) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.TARGET)).isEqualTo("abc");
        Assertions.assertThat((Long) createTableFactoryHelper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE)).isEqualTo(1000L);
    }

    @Test
    void testFactoryHelperWithMapOption() {
        HashMap hashMap = new HashMap();
        hashMap.put("properties.prop-1", "value-1");
        hashMap.put("properties.prop-2", "value-2");
        FactoryUtil.createTableFactoryHelper(new TestFactoryWithMap(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap)).validate();
    }

    @Test
    void testInvalidFactoryHelperWithMapOption() {
        HashMap hashMap = new HashMap();
        hashMap.put("properties.prop-1", "value-1");
        hashMap.put("properties.prop-2", "value-2");
        hashMap.put("unknown", "value-3");
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(new TestFactoryWithMap(), FactoryMocks.createTableContext(FactoryMocks.SCHEMA, hashMap));
        createTableFactoryHelper.getClass();
        Assertions.assertThatThrownBy(createTableFactoryHelper::validate).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, "Unsupported options found for 'test-factory-with-map'.\n\nUnsupported options:\n\nunknown\n\nSupported options:\n\nconnector\nproperties\nproperties.prop-1\nproperties.prop-2\nproperty-version")});
    }

    @Test
    void testDiscoverFactoryBadClass(@TempDir Path path) throws IOException {
        URLClassLoader build = ClassLoaderUtils.withRoot(path.toFile()).addClass("MyFancySerializationSchemaFormat", "public interface MyFancySerializationSchemaFormat extends " + SerializationFormatFactory.class.getName() + " {}").addClass("MyFancySerializationSchemaFormatImpl", "import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.format.EncodingFormat;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.factories.SerializationFormatFactory;import java.util.Set;public class MyFancySerializationSchemaFormatImpl implements MyFancySerializationSchemaFormat {@Override public String factoryIdentifier() { return null; }@Override public Set<ConfigOption<?>> requiredOptions() { return null; }@Override public Set<ConfigOption<?>> optionalOptions() { return null; }@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }}").addClass("AnotherSerializationSchema", "import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.format.EncodingFormat;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.factories.SerializationFormatFactory;import java.util.Set;public class AnotherSerializationSchema implements " + SerializationFormatFactory.class.getName() + " {@Override public String factoryIdentifier() { return null; }@Override public Set<ConfigOption<?>> requiredOptions() { return null; }@Override public Set<ConfigOption<?>> optionalOptions() { return null; }@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }}").addService(Factory.class.getName(), "MyFancySerializationSchemaFormatImpl").addService(Factory.class.getName(), "AnotherSerializationSchema").build();
        Files.delete(path.resolve("MyFancySerializationSchemaFormat.class"));
        Assertions.assertThat(FactoryUtil.discoverFactories(build)).map(factory -> {
            return factory.getClass().getName();
        }).doesNotContain(new String[]{"MyFancySerializationSchemaFormatImpl"}).contains(new String[]{"AnotherSerializationSchema"});
    }

    @Test
    void testDiscoverFactoryFromClosedClassLoader() throws Exception {
        MutableURLClassLoader create = FlinkUserCodeClassLoaders.create(new URL[0], FactoryUtilTest.class.getClassLoader(), new Configuration());
        create.close();
        Assertions.assertThatThrownBy(() -> {
            FactoryUtil.discoverFactory(create, Factory.class, "test");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IllegalStateException.class, "Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'")});
    }

    private static void assertCreateTableSourceWithOptionModifier(Consumer<Map<String, String>> consumer, String... strArr) {
        AbstractThrowableAssert assertThatThrownBy = Assertions.assertThatThrownBy(() -> {
            Map<String, String> createAllOptions = createAllOptions();
            consumer.accept(createAllOptions);
            FactoryMocks.createTableSource(FactoryMocks.SCHEMA, createAllOptions);
        });
        for (String str : strArr) {
            assertThatThrownBy.satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, str)});
        }
    }

    private static Map<String, String> createAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("property-version", "1");
        hashMap.put("connector", TestDynamicTableFactory.IDENTIFIER);
        hashMap.put("target", "MyTarget");
        hashMap.put("buffer-size", "1000");
        hashMap.put("key.format", TestFormatFactory.IDENTIFIER);
        hashMap.put("key.test-format.delimiter", ",");
        hashMap.put("value.format", TestFormatFactory.IDENTIFIER);
        hashMap.put("value.test-format.delimiter", "|");
        hashMap.put("value.test-format.fail-on-missing", "true");
        return hashMap;
    }

    private static Map<String, String> createWatermarkOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("scan.watermark.emit.strategy", "on-event");
        hashMap.put("scan.watermark.alignment.group", "group1");
        hashMap.put("scan.watermark.alignment.max-drift", "1min");
        hashMap.put("scan.watermark.alignment.update-interval", "1s");
        hashMap.put("scan.watermark.idle-timeout", "1min");
        return hashMap;
    }
}
