package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.class */
class KafkaConnectorOptionsUtil {
    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";
    private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT = ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
    protected static final String AVRO_CONFLUENT = "avro-confluent";
    protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent";
    private static final List<String> SCHEMA_REGISTRY_FORMATS = Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil$BoundedOptions.class */
    public static class BoundedOptions {
        public BoundedMode boundedMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long boundedTimestampMillis;
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil$StartupOptions.class */
    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }

    public static void validateTableSourceOptions(ReadableConfig readableConfig) {
        validateSourceTopic(readableConfig);
        validateScanStartupMode(readableConfig);
        validateScanBoundedMode(readableConfig);
    }

    public static void validateTableSinkOptions(ReadableConfig readableConfig) {
        validateSinkTopic(readableConfig);
        validateSinkPartitioner(readableConfig);
    }

    public static void validateSourceTopic(ReadableConfig readableConfig) {
        Optional optional = readableConfig.getOptional(KafkaConnectorOptions.TOPIC);
        Optional optional2 = readableConfig.getOptional(KafkaConnectorOptions.TOPIC_PATTERN);
        if (optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!optional.isPresent() && !optional2.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    public static void validateSinkTopic(ReadableConfig readableConfig) {
        if (isSingleTopic(readableConfig)) {
            return;
        }
        if (!readableConfig.getOptional(KafkaConnectorOptions.TOPIC_PATTERN).isPresent()) {
            throw new ValidationException(String.format("Flink Kafka sink currently only supports single topic, but got %s: %s.", "'topic'", readableConfig.get(KafkaConnectorOptions.TOPIC)));
        }
        throw new ValidationException(String.format("Flink Kafka sink currently only supports single topic, but got %s: %s.", "'topic-pattern'", readableConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
    }

    private static void validateScanStartupMode(ReadableConfig readableConfig) {
        readableConfig.getOptional(KafkaConnectorOptions.SCAN_STARTUP_MODE).ifPresent(scanStartupMode -> {
            switch (scanStartupMode) {
                case SPECIFIC_OFFSETS:
                    if (!readableConfig.getOptional(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key(), KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS));
                    }
                    if (!isSingleTopic(readableConfig)) {
                        throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                    }
                    parseSpecificOffsets((String) readableConfig.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS), KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key());
                    return;
                case TIMESTAMP:
                    if (!readableConfig.getOptional(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), KafkaConnectorOptions.ScanStartupMode.TIMESTAMP));
                    }
                    return;
                default:
                    return;
            }
        });
    }

    private static void validateScanBoundedMode(ReadableConfig readableConfig) {
        readableConfig.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_MODE).ifPresent(scanBoundedMode -> {
            switch (scanBoundedMode) {
                case TIMESTAMP:
                    if (!readableConfig.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' bounded mode but missing.", KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS.key(), KafkaConnectorOptions.ScanBoundedMode.TIMESTAMP));
                    }
                    return;
                case SPECIFIC_OFFSETS:
                    if (!readableConfig.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' bounded mode but missing.", KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key(), KafkaConnectorOptions.ScanBoundedMode.SPECIFIC_OFFSETS));
                    }
                    if (!isSingleTopic(readableConfig)) {
                        throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                    }
                    parseSpecificOffsets((String) readableConfig.get(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS), KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
                    return;
                default:
                    return;
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig readableConfig) {
        readableConfig.getOptional(KafkaConnectorOptions.SINK_PARTITIONER).ifPresent(str -> {
            if (str.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && readableConfig.getOptional(KafkaConnectorOptions.KEY_FIELDS).isPresent()) {
                throw new ValidationException("Currently 'round-robin' partitioner only works when option 'key.fields' is not specified.");
            }
            if (str.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", KafkaConnectorOptions.SINK_PARTITIONER.key()));
            }
        });
    }

    public static List<String> getSourceTopics(ReadableConfig readableConfig) {
        return (List) readableConfig.getOptional(KafkaConnectorOptions.TOPIC).orElse(null);
    }

    public static Pattern getSourceTopicPattern(ReadableConfig readableConfig) {
        return (Pattern) readableConfig.getOptional(KafkaConnectorOptions.TOPIC_PATTERN).map(Pattern::compile).orElse(null);
    }

    private static boolean isSingleTopic(ReadableConfig readableConfig) {
        return ((Boolean) readableConfig.getOptional(KafkaConnectorOptions.TOPIC).map(list -> {
            return Boolean.valueOf(list.size() == 1);
        }).orElse(false)).booleanValue();
    }

    public static StartupOptions getStartupOptions(ReadableConfig readableConfig) {
        HashMap hashMap = new HashMap();
        StartupMode startupMode = (StartupMode) readableConfig.getOptional(KafkaConnectorOptions.SCAN_STARTUP_MODE).map(KafkaConnectorOptionsUtil::fromOption).orElse(StartupMode.GROUP_OFFSETS);
        if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
            buildSpecificOffsets(readableConfig, (String) ((List) readableConfig.get(KafkaConnectorOptions.TOPIC)).get(0), hashMap);
        }
        StartupOptions startupOptions = new StartupOptions();
        startupOptions.startupMode = startupMode;
        startupOptions.specificOffsets = hashMap;
        if (startupMode == StartupMode.TIMESTAMP) {
            startupOptions.startupTimestampMillis = ((Long) readableConfig.get(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)).longValue();
        }
        return startupOptions;
    }

    public static BoundedOptions getBoundedOptions(ReadableConfig readableConfig) {
        HashMap hashMap = new HashMap();
        BoundedMode fromOption = fromOption((KafkaConnectorOptions.ScanBoundedMode) readableConfig.get(KafkaConnectorOptions.SCAN_BOUNDED_MODE));
        if (fromOption == BoundedMode.SPECIFIC_OFFSETS) {
            buildBoundedOffsets(readableConfig, (String) ((List) readableConfig.get(KafkaConnectorOptions.TOPIC)).get(0), hashMap);
        }
        BoundedOptions boundedOptions = new BoundedOptions();
        boundedOptions.boundedMode = fromOption;
        boundedOptions.specificOffsets = hashMap;
        if (fromOption == BoundedMode.TIMESTAMP) {
            boundedOptions.boundedTimestampMillis = ((Long) readableConfig.get(KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS)).longValue();
        }
        return boundedOptions;
    }

    private static void buildSpecificOffsets(ReadableConfig readableConfig, String str, Map<KafkaTopicPartition, Long> map) {
        parseSpecificOffsets((String) readableConfig.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS), KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key()).forEach((num, l) -> {
            map.put(new KafkaTopicPartition(str, num.intValue()), l);
        });
    }

    public static void buildBoundedOffsets(ReadableConfig readableConfig, String str, Map<KafkaTopicPartition, Long> map) {
        parseSpecificOffsets((String) readableConfig.get(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS), KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key()).forEach((num, l) -> {
            map.put(new KafkaTopicPartition(str, num.intValue()), l);
        });
    }

    private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
        switch (scanStartupMode) {
            case EARLIEST_OFFSET:
                return StartupMode.EARLIEST;
            case LATEST_OFFSET:
                return StartupMode.LATEST;
            case GROUP_OFFSETS:
                return StartupMode.GROUP_OFFSETS;
            case SPECIFIC_OFFSETS:
                return StartupMode.SPECIFIC_OFFSETS;
            case TIMESTAMP:
                return StartupMode.TIMESTAMP;
            default:
                throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }
    }

    private static BoundedMode fromOption(KafkaConnectorOptions.ScanBoundedMode scanBoundedMode) {
        switch (scanBoundedMode) {
            case UNBOUNDED:
                return BoundedMode.UNBOUNDED;
            case LATEST_OFFSET:
                return BoundedMode.LATEST;
            case GROUP_OFFSETS:
                return BoundedMode.GROUP_OFFSETS;
            case TIMESTAMP:
                return BoundedMode.TIMESTAMP;
            case SPECIFIC_OFFSETS:
                return BoundedMode.SPECIFIC_OFFSETS;
            default:
                throw new TableException("Unsupported bounded mode. Validator should have checked that.");
        }
    }

    public static Properties getKafkaProperties(Map<String, String> map) {
        Properties properties = new Properties();
        if (hasKafkaClientProperties(map)) {
            map.keySet().stream().filter(str -> {
                return str.startsWith("properties.");
            }).forEach(str2 -> {
                properties.put(str2.substring("properties.".length()), (String) map.get(str2));
            });
        }
        return properties;
    }

    public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig readableConfig, ClassLoader classLoader) {
        return readableConfig.getOptional(KafkaConnectorOptions.SINK_PARTITIONER).flatMap(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1662301013:
                    if (str.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN)) {
                        z = 2;
                        break;
                    }
                    break;
                case 97445748:
                    if (str.equals(SINK_PARTITIONER_VALUE_FIXED)) {
                        z = false;
                        break;
                    }
                    break;
                case 1544803905:
                    if (str.equals(SINK_PARTITIONER_VALUE_DEFAULT)) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return Optional.of(new FlinkFixedPartitioner());
                case true:
                case true:
                    return Optional.empty();
                default:
                    return Optional.of(initializePartitioner(str, classLoader));
            }
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String str, String str2) {
        HashMap hashMap = new HashMap();
        String[] split = str.split(";");
        String format = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", str2, str);
        if (split.length == 0) {
            throw new ValidationException(format);
        }
        for (String str3 : split) {
            if (null == str3 || str3.length() == 0 || !str3.contains(",")) {
                throw new ValidationException(format);
            }
            String[] split2 = str3.split(",");
            if (split2.length != 2 || !split2[0].startsWith("partition:") || !split2[1].startsWith("offset:")) {
                throw new ValidationException(format);
            }
            try {
                hashMap.put(Integer.valueOf(split2[0].substring(split2[0].indexOf(":") + 1)), Long.valueOf(split2[1].substring(split2[1].indexOf(":") + 1)));
            } catch (NumberFormatException e) {
                throw new ValidationException(format, e);
            }
        }
        return hashMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> map) {
        return map.keySet().stream().anyMatch(str -> {
            return str.startsWith("properties.");
        });
    }

    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(String str, ClassLoader classLoader) {
        try {
            if (FlinkKafkaPartitioner.class.isAssignableFrom(Class.forName(str, true, classLoader))) {
                return (FlinkKafkaPartitioner) InstantiationUtil.instantiate(str, FlinkKafkaPartitioner.class, classLoader);
            }
            throw new ValidationException(String.format("Sink partitioner class '%s' should extend from the required class %s", str, FlinkKafkaPartitioner.class.getName()));
        } catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", str), e);
        }
    }

    public static int[] createKeyFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(logicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
        Optional optional = readableConfig.getOptional(KafkaConnectorOptions.KEY_FORMAT);
        Optional optional2 = readableConfig.getOptional(KafkaConnectorOptions.KEY_FIELDS);
        if (!optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException(String.format("The option '%s' can only be declared if a key format is defined using '%s'.", KafkaConnectorOptions.KEY_FIELDS.key(), KafkaConnectorOptions.KEY_FORMAT.key()));
        }
        if (optional.isPresent() && (!optional2.isPresent() || ((List) optional2.get()).size() == 0)) {
            throw new ValidationException(String.format("A key format '%s' requires the declaration of one or more of key fields using '%s'.", KafkaConnectorOptions.KEY_FORMAT.key(), KafkaConnectorOptions.KEY_FIELDS.key()));
        }
        if (!optional.isPresent()) {
            return new int[0];
        }
        String str = (String) readableConfig.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("");
        List list = (List) optional2.get();
        List fieldNames = LogicalTypeChecks.getFieldNames(logicalType);
        return list.stream().mapToInt(str2 -> {
            int indexOf = fieldNames.indexOf(str2);
            if (indexOf < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option:\n%s", str2, KafkaConnectorOptions.KEY_FIELDS.key(), fieldNames));
            }
            if (str2.startsWith(str)) {
                return indexOf;
            }
            throw new ValidationException(String.format("All fields in '%s' must be prefixed with '%s' when option '%s' is set but field '%s' is not prefixed.", KafkaConnectorOptions.KEY_FIELDS.key(), str, KafkaConnectorOptions.KEY_FIELDS_PREFIX.key(), str2));
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(logicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
        IntStream range = IntStream.range(0, LogicalTypeChecks.getFieldCount(logicalType));
        String str = (String) readableConfig.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("");
        KafkaConnectorOptions.ValueFieldsStrategy valueFieldsStrategy = (KafkaConnectorOptions.ValueFieldsStrategy) readableConfig.get(KafkaConnectorOptions.VALUE_FIELDS_INCLUDE);
        if (valueFieldsStrategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
            if (str.length() > 0) {
                throw new ValidationException(String.format("A key prefix is not allowed when option '%s' is set to '%s'. Set it to '%s' instead to avoid field overlaps.", KafkaConnectorOptions.VALUE_FIELDS_INCLUDE.key(), KafkaConnectorOptions.ValueFieldsStrategy.ALL, KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY));
            }
            return range.toArray();
        }
        if (valueFieldsStrategy != KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
            throw new TableException("Unknown value fields strategy:" + valueFieldsStrategy);
        }
        int[] createKeyFormatProjection = createKeyFormatProjection(readableConfig, dataType);
        return range.filter(i -> {
            return IntStream.of(createKeyFormatProjection).noneMatch(i -> {
                return i == i;
            });
        }).toArray();
    }

    public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject(DynamicTableFactory.Context context) {
        Map options = context.getCatalogTable().getOptions();
        Map<String, String> autoCompleteSchemaRegistrySubject = autoCompleteSchemaRegistrySubject((Map<String, String>) options);
        return autoCompleteSchemaRegistrySubject.size() > options.size() ? new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(autoCompleteSchemaRegistrySubject), context.getEnrichmentOptions(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()) : context;
    }

    private static Map<String, String> autoCompleteSchemaRegistrySubject(Map<String, String> map) {
        Configuration fromMap = Configuration.fromMap(map);
        validateSinkTopic(fromMap);
        Optional optional = fromMap.getOptional(KafkaConnectorOptions.VALUE_FORMAT);
        Optional optional2 = fromMap.getOptional(KafkaConnectorOptions.KEY_FORMAT);
        Optional optional3 = fromMap.getOptional(FactoryUtil.FORMAT);
        String str = (String) ((List) fromMap.get(KafkaConnectorOptions.TOPIC)).get(0);
        if (optional3.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional3.get())) {
            autoCompleteSubject(fromMap, (String) optional3.get(), str + "-value");
        } else if (optional.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional.get())) {
            autoCompleteSubject(fromMap, "value." + ((String) optional.get()), str + "-value");
        }
        if (optional2.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional2.get())) {
            autoCompleteSubject(fromMap, "key." + ((String) optional2.get()), str + "-key");
        }
        return fromMap.toMap();
    }

    private static void autoCompleteSubject(Configuration configuration, String str, String str2) {
        ConfigOption noDefaultValue = ConfigOptions.key(str + "." + SCHEMA_REGISTRY_SUBJECT.key()).stringType().noDefaultValue();
        if (configuration.getOptional(noDefaultValue).isPresent()) {
            return;
        }
        configuration.setString(noDefaultValue, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void validateDeliveryGuarantee(ReadableConfig readableConfig) {
        if (readableConfig.get(KafkaConnectorOptions.DELIVERY_GUARANTEE) == DeliveryGuarantee.EXACTLY_ONCE && !readableConfig.getOptional(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX).isPresent()) {
            throw new ValidationException(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX.key() + " must be specified when using DeliveryGuarantee.EXACTLY_ONCE.");
        }
    }

    private KafkaConnectorOptionsUtil() {
    }
}
