/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarTableOptions {
    private static final Logger log = LoggerFactory.getLogger(PulsarTableOptions.class);
    public static final ConfigOption<String> SERVICE_URL = ConfigOptions.key((String)"service-url").stringType().noDefaultValue().withDescription("Required pulsar server connection string");
    public static final ConfigOption<String> ADMIN_URL = ConfigOptions.key((String)"admin-url").stringType().noDefaultValue().withDescription("Required pulsar admin connection string");
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key((String)"topic").stringType().asList().noDefaultValue().withDescription("Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. Option 'topic' is required for sink.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key((String)"topic-pattern").stringType().noDefaultValue().withDescription("Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key((String)"scan.startup.mode").stringType().defaultValue((Object)"latest").withDescription("Optional startup mode for Pulsar consumer, valid enumerations are \"earliest\", \"latest\", \"external-subscription\",\nor \"specific-offsets\"");
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key((String)"scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<String> SCAN_STARTUP_SUB_NAME = ConfigOptions.key((String)"scan.startup.sub-name").stringType().noDefaultValue().withDescription("Optional sub-name used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key((String)"scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<String> PULSAR_READER_READER_NAME = ConfigOptions.key((String)"pulsar.reader.readerName").stringType().noDefaultValue().withDescription("Optional pulsar reader readerName of \"readerName\"");
    public static final ConfigOption<String> PULSAR_READER_SUBSCRIPTION_ROLE_PREFIX = ConfigOptions.key((String)"pulsar.reader.subscriptionRolePrefix").stringType().noDefaultValue().withDescription("Optional pulsar reader subscriptionRolePrefix of \"subscriptionRolePrefix\"");
    public static final ConfigOption<String> PULSAR_READER_RECEIVER_QUEUE_SIZE = ConfigOptions.key((String)"pulsar.reader.receiver-queue-size").stringType().noDefaultValue().withDescription("Optional pulsar reader receiverQueueSize of \"receiver-queue-size\"");
    public static final ConfigOption<String> PARTITION_DISCOVERY_INTERVAL_MILLIS = ConfigOptions.key((String)"partition.discovery.interval-millis").stringType().noDefaultValue().withDescription("Optional discovery topic interval of \"interval-millis\" millis");
    public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions.key((String)"sink.partitioner").stringType().noDefaultValue().withDescription("Optional output partitioning from Flink's partitions\ninto pulsar's partitions valid enumerations are\n\"fixed\": (each Flink partition ends up in at most one pulsar partition),\n\"round-robin\": (a Flink partition is distributed to pulsar partitions round-robin)\n\"custom class name\": (use a custom FlinkPulsarPartitioner subclass)");
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest";
    public static final String SCAN_STARTUP_MODE_VALUE_EXTERNAL_SUBSCRIPTION = "external-subscription";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<String>(Arrays.asList("earliest", "latest", "external-subscription", "specific-offsets"));
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<String>(Arrays.asList("fixed", "round-robin"));
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    private PulsarTableOptions() {
    }

    public static void validateTableSourceOptions(ReadableConfig tableOptions) {
        PulsarTableOptions.validateSourceTopic(tableOptions);
        PulsarTableOptions.validateScanStartupMode(tableOptions);
    }

    public static void validateSourceTopic(ReadableConfig tableOptions) {
        Optional topic = tableOptions.getOptional(TOPIC);
        Optional pattern = tableOptions.getOptional(TOPIC_PATTERN);
        if (topic.isPresent() && pattern.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!topic.isPresent() && !pattern.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    private static void validateScanStartupMode(ReadableConfig tableOptions) {
        tableOptions.getOptional(SCAN_STARTUP_MODE).map(String::toLowerCase).ifPresent(mode -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest, latest, specific-offsets, external-subscription]", mode));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_EXTERNAL_SUBSCRIPTION) && !tableOptions.getOptional(SCAN_STARTUP_SUB_NAME).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SUB_NAME.key(), SCAN_STARTUP_SUB_NAME));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
                if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
                }
                String specificOffsets = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
                PulsarTableOptions.parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
        PulsarTableOptions.validateSinkTopic(tableOptions);
    }

    public static void validateSinkPartitioner(ReadableConfig tableOptions) {
        tableOptions.getOptional(SINK_PARTITIONER).ifPresent(partitioner -> {
            if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase()) && partitioner.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_PARTITIONER.key()));
            }
        });
    }

    public static void validateSinkTopic(ReadableConfig tableOptions) {
        String errorMessageTemp = "Flink Pulsar sink currently only supports single topic, but got %s: %s.";
        if (!PulsarTableOptions.isSingleTopic(tableOptions)) {
            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
                throw new ValidationException(String.format(errorMessageTemp, "'topic-pattern'", tableOptions.get(TOPIC_PATTERN)));
            }
            throw new ValidationException(String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC)));
        }
    }

    private static boolean isSingleTopic(ReadableConfig tableOptions) {
        return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false);
    }

    public static StartupOptions getStartupOptions(ReadableConfig tableOptions, List<String> topics) {
        HashMap<String, MessageId> specificOffsets = new HashMap<String, MessageId>();
        ArrayList subName = new ArrayList(1);
        StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE).map(modeString -> {
            switch (modeString) {
                case "earliest": {
                    return StartupMode.EARLIEST;
                }
                case "latest": {
                    return StartupMode.LATEST;
                }
                case "specific-offsets": {
                    String specificOffsetsStrOpt = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
                    Map<Integer, String> offsetList = PulsarTableOptions.parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
                    offsetList.forEach((partition, offset) -> {
                        try {
                            MessageIdImpl messageId = PulsarTableOptions.parseMessageId(offset);
                            specificOffsets.put(partition.toString(), (MessageId)messageId);
                        }
                        catch (Exception e) {
                            log.error("Failed to decode message id from properties {}", (Object)ExceptionUtils.stringifyException((Throwable)e));
                            throw new RuntimeException(e);
                        }
                    });
                    return StartupMode.SPECIFIC_OFFSETS;
                }
                case "external-subscription": {
                    subName.add(tableOptions.get(SCAN_STARTUP_SUB_NAME));
                    return StartupMode.EXTERNAL_SUBSCRIPTION;
                }
            }
            throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }).orElse(StartupMode.LATEST);
        StartupOptions options = new StartupOptions();
        options.startupMode = startupMode;
        options.specificOffsets = specificOffsets;
        if (subName.size() != 0) {
            options.externalSubscriptionName = (String)subName.get(0);
        }
        return options;
    }

    private static MessageIdImpl parseMessageId(String offset) {
        String[] split = offset.split(":");
        return new MessageIdImpl(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]));
    }

    public static Map<Integer, String> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, String> offsetMap = new HashMap<Integer, String>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format messageId with partition'42:1012:0;44:1011:1', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || pair.length() == 0) break;
            if (pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(":");
            if (kv.length != 3) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[2];
            String offsetValue = pair;
            try {
                Integer partition = Integer.valueOf(partitionValue);
                offsetMap.put(partition, offsetValue);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<String, MessageId> specificOffsets;
        public String externalSubscriptionName;

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof StartupOptions)) {
                return false;
            }
            StartupOptions other = (StartupOptions)o;
            if (!other.canEqual(this)) {
                return false;
            }
            StartupMode this$startupMode = this.startupMode;
            StartupMode other$startupMode = other.startupMode;
            if (this$startupMode == null ? other$startupMode != null : !((Object)((Object)this$startupMode)).equals((Object)other$startupMode)) {
                return false;
            }
            Map<String, MessageId> this$specificOffsets = this.specificOffsets;
            Map<String, MessageId> other$specificOffsets = other.specificOffsets;
            if (this$specificOffsets == null ? other$specificOffsets != null : !((Object)this$specificOffsets).equals(other$specificOffsets)) {
                return false;
            }
            String this$externalSubscriptionName = this.externalSubscriptionName;
            String other$externalSubscriptionName = other.externalSubscriptionName;
            return !(this$externalSubscriptionName == null ? other$externalSubscriptionName != null : !this$externalSubscriptionName.equals(other$externalSubscriptionName));
        }

        protected boolean canEqual(Object other) {
            return other instanceof StartupOptions;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            StartupMode $startupMode = this.startupMode;
            result = result * 59 + ($startupMode == null ? 43 : ((Object)((Object)$startupMode)).hashCode());
            Map<String, MessageId> $specificOffsets = this.specificOffsets;
            result = result * 59 + ($specificOffsets == null ? 43 : ((Object)$specificOffsets).hashCode());
            String $externalSubscriptionName = this.externalSubscriptionName;
            result = result * 59 + ($externalSubscriptionName == null ? 43 : $externalSubscriptionName.hashCode());
            return result;
        }
    }
}

