/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

public abstract class BasePulsarSubscriber
implements PulsarSubscriber {
    private static final long serialVersionUID = 2053021503331058888L;
    private static final Set<String> NON_PARTITIONED_TOPICS = ConcurrentHashMap.newKeySet();
    protected transient PulsarClient client;

    protected TopicMetadata queryTopicMetadata(String topic) throws ExecutionException, InterruptedException {
        if (NON_PARTITIONED_TOPICS.contains(topic)) {
            return new TopicMetadata(topic, 0);
        }
        PulsarClientImpl clientImpl = (PulsarClientImpl)this.client;
        PartitionedTopicMetadata metadata = (PartitionedTopicMetadata)clientImpl.getPartitionedTopicMetadata(topic).get();
        if (metadata.partitions == 0) {
            NON_PARTITIONED_TOPICS.add(topic);
        }
        return new TopicMetadata(topic, metadata.partitions);
    }

    protected Set<TopicPartition> createTopicPartitions(Set<String> topics, RangeGenerator generator, int parallelism) throws ExecutionException, InterruptedException {
        HashSet<TopicPartition> results = new HashSet<TopicPartition>();
        for (String topic : topics) {
            TopicMetadata metadata = this.queryTopicMetadata(topic);
            if (metadata == null) continue;
            List<TopicRange> ranges = generator.range(metadata, parallelism);
            if (!metadata.isPartitioned()) {
                results.add(new TopicPartition(metadata.getName(), ranges));
                continue;
            }
            for (int i = 0; i < metadata.getPartitionSize(); ++i) {
                results.add(new TopicPartition(metadata.getName(), i, ranges));
            }
        }
        return results;
    }

    @Override
    public void open(PulsarClient client) {
        this.client = client;
    }
}

