package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.record.Record;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.class */
public class KafkaOptions {
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";
    public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic").stringType().noDefaultValue().withDescription("Required topic name from which the table is read");
    public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions.key("properties.bootstrap.servers").stringType().noDefaultValue().withDescription("Required Kafka server connection string");
    public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions.key("properties.group.id").stringType().noDefaultValue().withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key("scan.startup.mode").stringType().defaultValue("group-offsets").withDescription("Optional startup mode for Kafka consumer, valid enumerations are \"earliest-offset\", \"latest-offset\", \"group-offsets\", \"timestamp\"\nor \"specific-offsets\"");
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key("scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions.key("sink.partitioner").stringType().noDefaultValue().withDescription("Optional output partitioning from Flink's partitions\ninto Kafka's partitions valid enumerations are\n\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet(Arrays.asList("earliest-offset", "latest-offset", "group-offsets", "specific-offsets", "timestamp"));
    private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet(Arrays.asList("fixed", "round-robin"));

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaOptions$StartupOptions.class */
    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }

    private KafkaOptions() {
    }

    public static void validateTableOptions(ReadableConfig readableConfig) {
        validateScanStartupMode(readableConfig);
        validateSinkPartitioner(readableConfig);
    }

    private static void validateScanStartupMode(ReadableConfig readableConfig) {
        readableConfig.getOptional(SCAN_STARTUP_MODE).map((v0) -> {
            return v0.toLowerCase();
        }).ifPresent(str -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(str)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]", str));
            }
            if (str.equals("timestamp") && !readableConfig.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_TIMESTAMP_MILLIS.key(), "timestamp"));
            }
            if (str.equals("specific-offsets")) {
                if (!readableConfig.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "specific-offsets"));
                }
                parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig readableConfig) {
        readableConfig.getOptional(SINK_PARTITIONER).ifPresent(str -> {
            if (!SINK_PARTITIONER_ENUMS.contains(str.toLowerCase()) && str.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_PARTITIONER.key()));
            }
        });
    }

    public static StartupOptions getStartupOptions(ReadableConfig readableConfig, String str) {
        HashMap hashMap = new HashMap();
        StartupMode startupMode = (StartupMode) readableConfig.getOptional(SCAN_STARTUP_MODE).map(str2 -> {
            boolean z = -1;
            switch (str2.hashCode()) {
                case -1390285235:
                    if (str2.equals("earliest-offset")) {
                        z = false;
                        break;
                    }
                    break;
                case -410146651:
                    if (str2.equals("specific-offsets")) {
                        z = 3;
                        break;
                    }
                    break;
                case 55126294:
                    if (str2.equals("timestamp")) {
                        z = 4;
                        break;
                    }
                    break;
                case 514263449:
                    if (str2.equals("latest-offset")) {
                        z = true;
                        break;
                    }
                    break;
                case 1556617458:
                    if (str2.equals("group-offsets")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return StartupMode.EARLIEST;
                case true:
                    return StartupMode.LATEST;
                case true:
                    return StartupMode.GROUP_OFFSETS;
                case Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET /* 3 */:
                    buildSpecificOffsets(readableConfig, str, hashMap);
                    return StartupMode.SPECIFIC_OFFSETS;
                case true:
                    return StartupMode.TIMESTAMP;
                default:
                    throw new TableException("Unsupported startup mode. Validator should have checked that.");
            }
        }).orElse(StartupMode.GROUP_OFFSETS);
        StartupOptions startupOptions = new StartupOptions();
        startupOptions.startupMode = startupMode;
        startupOptions.specificOffsets = hashMap;
        if (startupMode == StartupMode.TIMESTAMP) {
            startupOptions.startupTimestampMillis = ((Long) readableConfig.get(SCAN_STARTUP_TIMESTAMP_MILLIS)).longValue();
        }
        return startupOptions;
    }

    private static void buildSpecificOffsets(ReadableConfig readableConfig, String str, Map<KafkaTopicPartition, Long> map) {
        parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key()).forEach((num, l) -> {
            map.put(new KafkaTopicPartition(str, num.intValue()), l);
        });
    }

    public static Properties getKafkaProperties(Map<String, String> map) {
        Properties properties = new Properties();
        if (hasKafkaClientProperties(map)) {
            map.keySet().stream().filter(str -> {
                return str.startsWith(PROPERTIES_PREFIX);
            }).forEach(str2 -> {
                properties.put(str2.substring(PROPERTIES_PREFIX.length()), (String) map.get(str2));
            });
        }
        return properties;
    }

    public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig readableConfig, ClassLoader classLoader) {
        return readableConfig.getOptional(SINK_PARTITIONER).flatMap(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1662301013:
                    if (str.equals("round-robin")) {
                        z = true;
                        break;
                    }
                    break;
                case 97445748:
                    if (str.equals("fixed")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return Optional.of(new FlinkFixedPartitioner());
                case true:
                    return Optional.empty();
                default:
                    return Optional.of(initializePartitioner(str, classLoader));
            }
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String str, String str2) {
        HashMap hashMap = new HashMap();
        String[] split = str.split(";");
        String format = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", str2, str);
        if (split.length == 0) {
            throw new ValidationException(format);
        }
        for (String str3 : split) {
            if (null == str3 || str3.length() == 0 || !str3.contains(",")) {
                throw new ValidationException(format);
            }
            String[] split2 = str3.split(",");
            if (split2.length != 2 || !split2[0].startsWith("partition:") || !split2[1].startsWith("offset:")) {
                throw new ValidationException(format);
            }
            try {
                hashMap.put(Integer.valueOf(split2[0].substring(split2[0].indexOf(KafkaPrincipal.SEPARATOR) + 1)), Long.valueOf(split2[1].substring(split2[1].indexOf(KafkaPrincipal.SEPARATOR) + 1)));
            } catch (NumberFormatException e) {
                throw new ValidationException(format, e);
            }
        }
        return hashMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> map) {
        return map.keySet().stream().anyMatch(str -> {
            return str.startsWith(PROPERTIES_PREFIX);
        });
    }

    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(String str, ClassLoader classLoader) {
        try {
            if (FlinkKafkaPartitioner.class.isAssignableFrom(Class.forName(str, true, classLoader))) {
                return (FlinkKafkaPartitioner) InstantiationUtil.instantiate(str, FlinkKafkaPartitioner.class, classLoader);
            }
            throw new ValidationException(String.format("Sink partitioner class '%s' should extend from the required class %s", str, FlinkKafkaPartitioner.class.getName()));
        } catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", str), e);
        }
    }
}
