/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

public class KafkaOptions {
    public static final ConfigOption<String> KEY_FORMAT = ConfigOptions.key((String)"key.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding key data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key((String)"value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<List<String>> KEY_FIELDS = ConfigOptions.key((String)"key.fields").stringType().asList().defaultValues((Object[])new String[0]).withDescription("Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined.");
    public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE = ConfigOptions.key((String)"value.fields-include").enumType(ValueFieldsStrategy.class).defaultValue((Object)ValueFieldsStrategy.ALL).withDescription("Defines a strategy how to deal with key columns in the data type of the value format. By default, '" + (Object)((Object)ValueFieldsStrategy.ALL) + "' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.");
    public static final ConfigOption<String> KEY_FIELDS_PREFIX = ConfigOptions.key((String)"key.fields-prefix").stringType().noDefaultValue().withDescription("Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and '" + KEY_FIELDS.key() + "' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that '" + VALUE_FIELDS_INCLUDE.key() + "' must be '" + (Object)((Object)ValueFieldsStrategy.EXCEPT_KEY) + "'.");
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key((String)"topic").stringType().asList().noDefaultValue().withDescription("Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. Option 'topic' is required for sink.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key((String)"topic-pattern").stringType().noDefaultValue().withDescription("Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
    public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions.key((String)"properties.bootstrap.servers").stringType().noDefaultValue().withDescription("Required Kafka server connection string");
    public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions.key((String)"properties.group.id").stringType().noDefaultValue().withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key((String)"scan.startup.mode").stringType().defaultValue((Object)"group-offsets").withDescription("Optional startup mode for Kafka consumer, valid enumerations are \"earliest-offset\", \"latest-offset\", \"group-offsets\", \"timestamp\"\nor \"specific-offsets\"");
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key((String)"scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key((String)"scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<Duration> SCAN_TOPIC_PARTITION_DISCOVERY = ConfigOptions.key((String)"scan.topic-partition-discovery.interval").durationType().noDefaultValue().withDescription("Optional interval for consumer to discover dynamically created Kafka partitions periodically.");
    public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions.key((String)"sink.partitioner").stringType().defaultValue((Object)"default").withDescription("Optional output partitioning from Flink's partitions\ninto Kafka's partitions valid enumerations are\n\"default\": (use kafka default partitioner to partition records),\n\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified.)\n\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
    public static final ConfigOption<String> SINK_SEMANTIC = ConfigOptions.key((String)"sink.semantic").stringType().defaultValue((Object)"at-least-once").withDescription("Optional semantic when commit. Valid enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<String>(Arrays.asList("earliest-offset", "latest-offset", "group-offsets", "specific-offsets", "timestamp"));
    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once";
    public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once";
    public static final String SINK_SEMANTIC_VALUE_NONE = "none";
    private static final Set<String> SINK_SEMANTIC_ENUMS = new HashSet<String>(Arrays.asList("at-least-once", "exactly-once", "none"));
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    private KafkaOptions() {
    }

    public static void validateTableSourceOptions(ReadableConfig tableOptions) {
        KafkaOptions.validateSourceTopic(tableOptions);
        KafkaOptions.validateScanStartupMode(tableOptions);
    }

    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
        KafkaOptions.validateSinkTopic(tableOptions);
        KafkaOptions.validateSinkPartitioner(tableOptions);
        KafkaOptions.validateSinkSemantic(tableOptions);
    }

    public static void validateSourceTopic(ReadableConfig tableOptions) {
        Optional topic = tableOptions.getOptional(TOPIC);
        Optional pattern = tableOptions.getOptional(TOPIC_PATTERN);
        if (topic.isPresent() && pattern.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!topic.isPresent() && !pattern.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    public static void validateSinkTopic(ReadableConfig tableOptions) {
        String errorMessageTemp = "Flink Kafka sink currently only supports single topic, but got %s: %s.";
        if (!KafkaOptions.isSingleTopic(tableOptions)) {
            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
                throw new ValidationException(String.format(errorMessageTemp, "'topic-pattern'", tableOptions.get(TOPIC_PATTERN)));
            }
            throw new ValidationException(String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC)));
        }
    }

    private static void validateScanStartupMode(ReadableConfig tableOptions) {
        tableOptions.getOptional(SCAN_STARTUP_MODE).map(String::toLowerCase).ifPresent(mode -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]", mode));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP) && !tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
                if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
                }
                if (!KafkaOptions.isSingleTopic(tableOptions)) {
                    throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                }
                String specificOffsets = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
                KafkaOptions.parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig tableOptions) {
        tableOptions.getOptional(SINK_PARTITIONER).ifPresent(partitioner -> {
            if (partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && tableOptions.getOptional(KEY_FIELDS).isPresent()) {
                throw new ValidationException("Currently 'round-robin' partitioner only works when option 'key.fields' is not specified.");
            }
            if (partitioner.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_PARTITIONER.key()));
            }
        });
    }

    private static void validateSinkSemantic(ReadableConfig tableOptions) {
        tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> {
            if (!SINK_SEMANTIC_ENUMS.contains(semantic)) {
                throw new ValidationException(String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].", semantic, SINK_SEMANTIC.key()));
            }
        });
    }

    public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions) {
        switch ((String)tableOptions.get(SINK_SEMANTIC)) {
            case "exactly-once": {
                return KafkaSinkSemantic.EXACTLY_ONCE;
            }
            case "at-least-once": {
                return KafkaSinkSemantic.AT_LEAST_ONCE;
            }
            case "none": {
                return KafkaSinkSemantic.NONE;
            }
        }
        throw new TableException("Validator should have checked that");
    }

    public static List<String> getSourceTopics(ReadableConfig tableOptions) {
        return tableOptions.getOptional(TOPIC).orElse(null);
    }

    public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
        return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
    }

    private static boolean isSingleTopic(ReadableConfig tableOptions) {
        return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false);
    }

    public static StartupOptions getStartupOptions(ReadableConfig tableOptions) {
        HashMap<KafkaTopicPartition, Long> specificOffsets = new HashMap<KafkaTopicPartition, Long>();
        StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE).map(modeString -> {
            switch (modeString) {
                case "earliest-offset": {
                    return StartupMode.EARLIEST;
                }
                case "latest-offset": {
                    return StartupMode.LATEST;
                }
                case "group-offsets": {
                    return StartupMode.GROUP_OFFSETS;
                }
                case "specific-offsets": {
                    KafkaOptions.buildSpecificOffsets(tableOptions, (String)((List)tableOptions.get(TOPIC)).get(0), specificOffsets);
                    return StartupMode.SPECIFIC_OFFSETS;
                }
                case "timestamp": {
                    return StartupMode.TIMESTAMP;
                }
            }
            throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }).orElse(StartupMode.GROUP_OFFSETS);
        StartupOptions options = new StartupOptions();
        options.startupMode = startupMode;
        options.specificOffsets = specificOffsets;
        if (startupMode == StartupMode.TIMESTAMP) {
            options.startupTimestampMillis = (Long)tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
        }
        return options;
    }

    private static void buildSpecificOffsets(ReadableConfig tableOptions, String topic, Map<KafkaTopicPartition, Long> specificOffsets) {
        String specificOffsetsStrOpt = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
        Map<Integer, Long> offsetMap = KafkaOptions.parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
        offsetMap.forEach((partition, offset) -> {
            KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, (int)partition);
            specificOffsets.put(topicPartition, (Long)offset);
        });
    }

    public static Properties getKafkaProperties(Map<String, String> tableOptions) {
        Properties kafkaProperties = new Properties();
        if (KafkaOptions.hasKafkaClientProperties(tableOptions)) {
            tableOptions.keySet().stream().filter(key -> key.startsWith(PROPERTIES_PREFIX)).forEach(key -> {
                String value = (String)tableOptions.get(key);
                String subKey = key.substring(PROPERTIES_PREFIX.length());
                kafkaProperties.put(subKey, value);
            });
        }
        return kafkaProperties;
    }

    public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions, ClassLoader classLoader) {
        return tableOptions.getOptional(SINK_PARTITIONER).flatMap(partitioner -> {
            switch (partitioner) {
                case "fixed": {
                    return Optional.of(new FlinkFixedPartitioner());
                }
                case "default": 
                case "round-robin": {
                    return Optional.empty();
                }
            }
            return Optional.of(KafkaOptions.initializePartitioner(partitioner, classLoader));
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, Long> offsetMap = new HashMap<Integer, Long>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(",");
            if (kv.length != 2 || !kv[0].startsWith("partition:") || !kv[1].startsWith("offset:")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
            try {
                Integer partition = Integer.valueOf(partitionValue);
                Long offset = Long.valueOf(offsetValue);
                offsetMap.put(partition, offset);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
    }

    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(String name, ClassLoader classLoader) {
        try {
            Class<?> clazz = Class.forName(name, true, classLoader);
            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
                throw new ValidationException(String.format("Sink partitioner class '%s' should extend from the required class %s", name, FlinkKafkaPartitioner.class.getName()));
            }
            FlinkKafkaPartitioner kafkaPartitioner = (FlinkKafkaPartitioner)InstantiationUtil.instantiate((String)name, FlinkKafkaPartitioner.class, (ClassLoader)classLoader);
            return kafkaPartitioner;
        }
        catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", name), e);
        }
    }

    public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)LogicalTypeChecks.hasRoot((LogicalType)physicalType, (LogicalTypeRoot)LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        Optional optionalKeyFormat = options.getOptional(KEY_FORMAT);
        Optional optionalKeyFields = options.getOptional(KEY_FIELDS);
        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
            throw new ValidationException(String.format("The option '%s' can only be declared if a key format is defined using '%s'.", KEY_FIELDS.key(), KEY_FORMAT.key()));
        }
        if (optionalKeyFormat.isPresent() && (!optionalKeyFields.isPresent() || ((List)optionalKeyFields.get()).size() == 0)) {
            throw new ValidationException(String.format("A key format '%s' requires the declaration of one or more of key fields using '%s'.", KEY_FORMAT.key(), KEY_FIELDS.key()));
        }
        if (!optionalKeyFormat.isPresent()) {
            return new int[0];
        }
        String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse("");
        List keyFields = (List)optionalKeyFields.get();
        List physicalFields = LogicalTypeChecks.getFieldNames((LogicalType)physicalType);
        return keyFields.stream().mapToInt(keyField -> {
            int pos = physicalFields.indexOf(keyField);
            if (pos < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option:\n%s", keyField, KEY_FIELDS.key(), physicalFields));
            }
            if (!keyField.startsWith(keyPrefix)) {
                throw new ValidationException(String.format("All fields in '%s' must be prefixed with '%s' when option '%s' is set but field '%s' is not prefixed.", KEY_FIELDS.key(), keyPrefix, KEY_FIELDS_PREFIX.key(), keyField));
            }
            return pos;
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)LogicalTypeChecks.hasRoot((LogicalType)physicalType, (LogicalTypeRoot)LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        int physicalFieldCount = LogicalTypeChecks.getFieldCount((LogicalType)physicalType);
        IntStream physicalFields = IntStream.range(0, physicalFieldCount);
        String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse("");
        ValueFieldsStrategy strategy = (ValueFieldsStrategy)((Object)options.get(VALUE_FIELDS_INCLUDE));
        if (strategy == ValueFieldsStrategy.ALL) {
            if (keyPrefix.length() > 0) {
                throw new ValidationException(String.format("A key prefix is not allowed when option '%s' is set to '%s'. Set it to '%s' instead to avoid field overlaps.", new Object[]{VALUE_FIELDS_INCLUDE.key(), ValueFieldsStrategy.ALL, ValueFieldsStrategy.EXCEPT_KEY}));
            }
            return physicalFields.toArray();
        }
        if (strategy == ValueFieldsStrategy.EXCEPT_KEY) {
            int[] keyProjection = KafkaOptions.createKeyFormatProjection(options, physicalDataType);
            return physicalFields.filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)).toArray();
        }
        throw new TableException("Unknown value fields strategy:" + (Object)((Object)strategy));
    }

    public static enum ValueFieldsStrategy {
        ALL,
        EXCEPT_KEY;

    }

    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }
}

