package org.apache.flink.streaming.connectors.kafka.table;

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.class */
public abstract class KafkaDynamicTableFactoryBase implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        String str = (String) options.get(KafkaOptions.TOPIC);
        DecodingFormat<DeserializationSchema<RowData>> discoverDecodingFormat = createTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT);
        createTableFactoryHelper.validateExcept(new String[]{KafkaOptions.PROPERTIES_PREFIX});
        KafkaOptions.validateTableOptions(options);
        DataType physicalRowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        KafkaOptions.StartupOptions startupOptions = KafkaOptions.getStartupOptions(options, str);
        return createKafkaTableSource(physicalRowDataType, str, KafkaOptions.getKafkaProperties(context.getCatalogTable().getOptions()), discoverDecodingFormat, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        String str = (String) options.get(KafkaOptions.TOPIC);
        EncodingFormat<SerializationSchema<RowData>> discoverEncodingFormat = createTableFactoryHelper.discoverEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT);
        createTableFactoryHelper.validateExcept(new String[]{KafkaOptions.PROPERTIES_PREFIX});
        KafkaOptions.validateTableOptions(options);
        return createKafkaTableSink(context.getCatalogTable().getSchema().toPhysicalRowDataType(), str, KafkaOptions.getKafkaProperties(context.getCatalogTable().getOptions()), KafkaOptions.getFlinkKafkaPartitioner(options, context.getClassLoader()), discoverEncodingFormat);
    }

    protected abstract KafkaDynamicSourceBase createKafkaTableSource(DataType dataType, String str, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, long j);

    protected abstract KafkaDynamicSinkBase createKafkaTableSink(DataType dataType, String str, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> optional, EncodingFormat<SerializationSchema<RowData>> encodingFormat);

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(KafkaOptions.TOPIC);
        hashSet.add(FactoryUtil.FORMAT);
        hashSet.add(KafkaOptions.PROPS_BOOTSTRAP_SERVERS);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(KafkaOptions.PROPS_GROUP_ID);
        hashSet.add(KafkaOptions.SCAN_STARTUP_MODE);
        hashSet.add(KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
        hashSet.add(KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS);
        hashSet.add(KafkaOptions.SINK_PARTITIONER);
        return hashSet;
    }
}
