package org.apache.flink.connector.pulsar.table;

import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
import org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils;

/* loaded from: input_file:org/apache/flink/connector/pulsar/table/PulsarTableFactory.class */
public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "pulsar";
    public static final String DEFAULT_SUBSCRIPTION_NAME_PREFIX = "flink-sql-connector-pulsar-";
    public static final boolean UPSERT_DISABLED = false;

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = PulsarTableOptionUtils.getKeyDecodingFormat(createTableFactoryHelper);
        DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = PulsarTableOptionUtils.getValueDecodingFormat(createTableFactoryHelper);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validateExcept(new String[]{PulsarOptions.CLIENT_CONFIG_PREFIX, PulsarSourceOptions.SOURCE_CONFIG_PREFIX, PulsarSourceOptions.CONSUMER_CONFIG_PREFIX, PulsarSinkOptions.PRODUCER_CONFIG_PREFIX, PulsarSinkOptions.SINK_CONFIG_PREFIX});
        PulsarTableValidationUtils.validatePrimaryKeyConstraints(context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), createTableFactoryHelper);
        PulsarTableValidationUtils.validateTableSourceOptions(options);
        List<String> topicListFromOptions = PulsarTableOptionUtils.getTopicListFromOptions(options);
        StartCursor startCursor = PulsarTableOptionUtils.getStartCursor(options);
        StopCursor stopCursor = PulsarTableOptionUtils.getStopCursor(options);
        SubscriptionType subscriptionType = PulsarTableOptionUtils.getSubscriptionType(options);
        Properties pulsarProperties = PulsarTableOptionUtils.getPulsarProperties(options);
        pulsarProperties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), (String) options.get(PulsarTableOptions.SERVICE_URL));
        pulsarProperties.setProperty(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME.key(), (String) options.getOptional(PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME).orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX + RandomStringUtils.randomAlphabetic(5)));
        DataType physicalRowDataType = context.getPhysicalRowDataType();
        return new PulsarTableSource(new PulsarTableDeserializationSchemaFactory(physicalRowDataType, keyDecodingFormat, PulsarTableOptionUtils.createKeyFormatProjection(options, physicalRowDataType), valueDecodingFormat, PulsarTableOptionUtils.createValueFormatProjection(options, physicalRowDataType), false), valueDecodingFormat, valueDecodingFormat.getChangelogMode(), topicListFromOptions, pulsarProperties, startCursor, stopCursor, subscriptionType);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = PulsarTableOptionUtils.getKeyEncodingFormat(createTableFactoryHelper);
        EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = PulsarTableOptionUtils.getValueEncodingFormat(createTableFactoryHelper);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validateExcept(new String[]{PulsarOptions.CLIENT_CONFIG_PREFIX, PulsarSourceOptions.SOURCE_CONFIG_PREFIX, PulsarSourceOptions.CONSUMER_CONFIG_PREFIX, PulsarSinkOptions.PRODUCER_CONFIG_PREFIX, PulsarSinkOptions.SINK_CONFIG_PREFIX});
        PulsarTableValidationUtils.validatePrimaryKeyConstraints(context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), createTableFactoryHelper);
        PulsarTableValidationUtils.validateTableSinkOptions(options);
        TopicRouter<RowData> topicRouter = PulsarTableOptionUtils.getTopicRouter(options, context.getClassLoader());
        TopicRoutingMode topicRoutingMode = PulsarTableOptionUtils.getTopicRoutingMode(options);
        long messageDelayMillis = PulsarTableOptionUtils.getMessageDelayMillis(options);
        List<String> topicListFromOptions = PulsarTableOptionUtils.getTopicListFromOptions(options);
        Properties pulsarProperties = PulsarTableOptionUtils.getPulsarProperties(options);
        pulsarProperties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), (String) options.get(PulsarTableOptions.SERVICE_URL));
        DataType physicalRowDataType = context.getPhysicalRowDataType();
        return new PulsarTableSink(new PulsarTableSerializationSchemaFactory(physicalRowDataType, keyEncodingFormat, PulsarTableOptionUtils.createKeyFormatProjection(options, physicalRowDataType), valueEncodingFormat, PulsarTableOptionUtils.createValueFormatProjection(options, physicalRowDataType), false), valueEncodingFormat.getChangelogMode(), topicListFromOptions, pulsarProperties, DeliveryGuarantee.AT_LEAST_ONCE, topicRouter, topicRoutingMode, messageDelayMillis);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return (Set) Stream.of((Object[]) new ConfigOption[]{PulsarTableOptions.TOPICS, PulsarTableOptions.SERVICE_URL}).collect(Collectors.toSet());
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return (Set) Stream.of((Object[]) new ConfigOption[]{FactoryUtil.FORMAT, PulsarTableOptions.VALUE_FORMAT, PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME, PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE, PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID, PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME, PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME, PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER, PulsarTableOptions.SINK_TOPIC_ROUTING_MODE, PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL, FactoryUtil.SINK_PARALLELISM, PulsarTableOptions.KEY_FORMAT, PulsarTableOptions.KEY_FIELDS, PulsarTableOptions.EXPLICIT}).collect(Collectors.toSet());
    }

    public Set<ConfigOption<?>> forwardOptions() {
        return (Set) Stream.of((Object[]) new ConfigOption[]{PulsarTableOptions.TOPICS, PulsarTableOptions.SERVICE_URL, PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE, PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME, PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID, PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME, PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME, PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER, PulsarTableOptions.SINK_TOPIC_ROUTING_MODE, PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL}).collect(Collectors.toSet());
    }
}
