/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.BasePulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;

public class TopicPatternSubscriber
extends BasePulsarSubscriber {
    private static final long serialVersionUID = 3307710093243745104L;
    private final Pattern shortenedPattern;
    private final String namespace;
    private final CommandGetTopicsOfNamespace.Mode subscriptionMode;

    public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode subscriptionMode) {
        TopicName destination = TopicName.get((String)topicPattern.pattern());
        String pattern = destination.toString();
        this.shortenedPattern = Pattern.compile(pattern.split("://")[1]);
        this.namespace = destination.getNamespaceObject().toString();
        this.subscriptionMode = this.convertRegexSubscriptionMode(subscriptionMode);
    }

    @Override
    public Set<TopicPartition> getSubscribedTopicPartitions(RangeGenerator generator, int parallelism) throws Exception {
        Set<String> topics = this.queryTopicsByInternalProtocols();
        return this.createTopicPartitions(topics, generator, parallelism);
    }

    private Set<String> queryTopicsByInternalProtocols() throws PulsarClientException {
        Preconditions.checkNotNull((Object)this.client, (String)"This subscriber doesn't initialize properly.");
        LookupService lookupService = ((PulsarClientImpl)this.client).getLookup();
        NamespaceName namespaceName = NamespaceName.get((String)this.namespace);
        try {
            String queryPattern = this.shortenedPattern.toString();
            if (!queryPattern.endsWith(".*")) {
                queryPattern = null;
            }
            GetTopicsResult topicsResult = (GetTopicsResult)lookupService.getTopicsUnderNamespace(namespaceName, this.subscriptionMode, queryPattern, null).get();
            List topics = topicsResult.getTopics();
            HashSet<String> results = new HashSet<String>(topics.size());
            for (String topic : topics) {
                if (TopicNameUtils.isInternal(topic) || !topicsResult.isFiltered() && !this.matchesTopicPattern(topic)) continue;
                results.add(topic);
            }
            return results;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    private boolean matchesTopicPattern(String topic) {
        String shortenedTopic = topic.split("://")[1];
        return this.shortenedPattern.matcher(shortenedTopic).matches();
    }

    private CommandGetTopicsOfNamespace.Mode convertRegexSubscriptionMode(RegexSubscriptionMode subscriptionMode) {
        switch (subscriptionMode) {
            case AllTopics: {
                return CommandGetTopicsOfNamespace.Mode.ALL;
            }
            case PersistentOnly: {
                return CommandGetTopicsOfNamespace.Mode.PERSISTENT;
            }
            case NonPersistentOnly: {
                return CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT;
            }
        }
        throw new IllegalArgumentException("We don't support such subscription mode " + subscriptionMode);
    }
}

