/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.com.google.common.collect.Iterables;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMetadataReader
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataReader.class);
    private final String adminUrl;
    private final String subscriptionName;
    private final Map<String, String> caseInsensitiveParams;
    private final int indexOfThisSubtask;
    private final int numParallelSubtasks;
    private final PulsarAdmin admin;
    private volatile boolean closed = false;
    private Set<TopicRange> seenTopics = new HashSet<TopicRange>();
    private final boolean useExternalSubscription;
    private final SerializableRange range;

    public PulsarMetadataReader(String adminUrl, ClientConfigurationData clientConf, String subscriptionName, Map<String, String> caseInsensitiveParams, int indexOfThisSubtask, int numParallelSubtasks, boolean useExternalSubscription) throws PulsarClientException {
        this.adminUrl = adminUrl;
        this.subscriptionName = subscriptionName;
        this.caseInsensitiveParams = caseInsensitiveParams;
        this.indexOfThisSubtask = indexOfThisSubtask;
        this.numParallelSubtasks = numParallelSubtasks;
        this.useExternalSubscription = useExternalSubscription;
        this.admin = PulsarClientUtils.newAdminFromConf(adminUrl, clientConf);
        this.range = this.buildRange(caseInsensitiveParams);
    }

    private SerializableRange buildRange(Map<String, String> caseInsensitiveParams) {
        if (this.numParallelSubtasks <= 0 || this.indexOfThisSubtask < 0) {
            return SerializableRange.ofFullRange();
        }
        if (caseInsensitiveParams == null || caseInsensitiveParams.isEmpty() || !caseInsensitiveParams.containsKey("enable-key-hash-range")) {
            return SerializableRange.ofFullRange();
        }
        String enableKeyHashRange = caseInsensitiveParams.get("enable-key-hash-range");
        if (!Boolean.parseBoolean(enableKeyHashRange)) {
            return SerializableRange.ofFullRange();
        }
        Range range = SourceSinkUtils.distributeRange(this.numParallelSubtasks, this.indexOfThisSubtask);
        return SerializableRange.of(range);
    }

    public PulsarMetadataReader(String adminUrl, ClientConfigurationData clientConf, String subscriptionName, Map<String, String> caseInsensitiveParams, int indexOfThisSubtask, int numParallelSubtasks) throws PulsarClientException {
        this(adminUrl, clientConf, subscriptionName, caseInsensitiveParams, indexOfThisSubtask, numParallelSubtasks, false);
    }

    @Override
    public void close() {
        this.closed = true;
        this.admin.close();
    }

    public Set<TopicRange> discoverTopicChanges() throws PulsarAdminException, ClosedException {
        if (!this.closed) {
            Set<TopicRange> currentTopics = this.getTopicPartitions();
            Sets.SetView addedTopics = Sets.difference(currentTopics, this.seenTopics);
            this.seenTopics = currentTopics;
            return addedTopics;
        }
        throw new ClosedException();
    }

    public List<String> listNamespaces() throws PulsarAdminException {
        List tenants = this.admin.tenants().getTenants();
        ArrayList<String> namespaces = new ArrayList<String>();
        for (String tenant : tenants) {
            namespaces.addAll(this.admin.namespaces().getNamespaces(tenant));
        }
        return namespaces;
    }

    public boolean namespaceExists(String ns) throws PulsarAdminException {
        try {
            this.admin.namespaces().getTopics(ns);
        }
        catch (PulsarAdminException.NotFoundException e) {
            return false;
        }
        return true;
    }

    public void createNamespace(String ns) throws PulsarAdminException {
        String nsName = NamespaceName.get((String)ns).toString();
        this.admin.namespaces().createNamespace(nsName);
    }

    public void deleteNamespace(String ns) throws PulsarAdminException {
        String nsName = NamespaceName.get((String)ns).toString();
        this.admin.namespaces().deleteNamespace(nsName);
    }

    public List<String> getTopics(String ns) throws PulsarAdminException {
        List<String> nonPartitionedTopics = this.getNonPartitionedTopics(ns);
        List partitionedTopics = this.admin.topics().getPartitionedTopicList(ns);
        ArrayList allTopics = new ArrayList();
        Stream.of(partitionedTopics, nonPartitionedTopics).forEach(allTopics::addAll);
        return allTopics.stream().map(t -> TopicName.get((String)t).getLocalName()).collect(Collectors.toList());
    }

    public boolean topicExists(String topicName) throws PulsarAdminException {
        int partitionNum = this.admin.topics().getPartitionedTopicMetadata((String)topicName).partitions;
        if (partitionNum > 0) {
            return true;
        }
        this.admin.topics().getStats(topicName);
        return true;
    }

    public void deleteTopic(String topicName) throws PulsarAdminException {
        int partitionNum = this.admin.topics().getPartitionedTopicMetadata((String)topicName).partitions;
        if (partitionNum > 0) {
            this.admin.topics().deletePartitionedTopic(topicName, true);
        } else {
            this.admin.topics().delete(topicName, true);
        }
    }

    public void createTopic(String topicName, int defaultPartitionNum) throws PulsarAdminException, IncompatibleSchemaException {
        this.admin.topics().createPartitionedTopic(topicName, defaultPartitionNum);
    }

    public void putSchema(String topicName, SchemaInfo schemaInfo) throws IncompatibleSchemaException {
        SchemaUtils.uploadPulsarSchema(this.admin, topicName, schemaInfo);
    }

    public void setupCursor(Map<TopicRange, MessageId> offset, boolean failOnDataLoss) {
        if (!this.useExternalSubscription || !failOnDataLoss) {
            for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
                try {
                    log.info("Setting up subscription {} on topic {} at position {}", new Object[]{this.subscriptionName, entry.getKey(), entry.getValue()});
                    this.admin.topics().createSubscription(entry.getKey().getTopic(), this.subscriptionNameFrom(entry.getKey()), entry.getValue());
                    log.info("Subscription {} on topic {} at position {} finished", new Object[]{this.subscriptionName, entry.getKey(), entry.getValue()});
                }
                catch (PulsarAdminException.ConflictException e) {
                    log.info("Subscription {} on topic {} already exists", (Object)this.subscriptionName, (Object)entry.getKey());
                }
                catch (PulsarAdminException e) {
                    throw new RuntimeException(String.format("Failed to set up cursor for %s ", entry.getKey().toString()), e);
                }
            }
        }
    }

    public void setupCursor(Map<TopicRange, MessageId> offset) {
        this.setupCursor(offset, true);
    }

    public void commitCursorToOffset(Map<TopicRange, MessageId> offset) {
        for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
            TopicRange tp = entry.getKey();
            try {
                log.info("Committing offset {} to topic {}", (Object)entry.getValue(), (Object)tp);
                this.admin.topics().resetCursor(tp.getTopic(), this.subscriptionNameFrom(tp), entry.getValue());
                log.info("Successfully committed offset {} to topic {}", (Object)entry.getValue(), (Object)tp);
            }
            catch (Throwable e) {
                if (e instanceof PulsarAdminException && (((PulsarAdminException)e).getStatusCode() == 404 || ((PulsarAdminException)e).getStatusCode() == 412)) {
                    log.info("Cannot commit cursor since the topic {} has been deleted during execution", (Object)tp);
                    continue;
                }
                throw new RuntimeException(String.format("Failed to commit cursor for %s", tp), e);
            }
        }
    }

    public void removeCursor(Set<TopicRange> topics) {
        if (!this.useExternalSubscription) {
            for (TopicRange topicRange : topics) {
                try {
                    log.info("Removing subscription {} from topic {}", (Object)this.subscriptionName, (Object)topicRange.getTopic());
                    this.admin.topics().deleteSubscription(topicRange.getTopic(), this.subscriptionNameFrom(topicRange));
                    log.info("Successfully removed subscription {} from topic {}", (Object)this.subscriptionName, (Object)topicRange.getTopic());
                }
                catch (Throwable e) {
                    if (e instanceof PulsarAdminException && ((PulsarAdminException)e).getStatusCode() == 404) {
                        log.info("Cannot remove cursor since the topic {} has been deleted during execution", (Object)topicRange.getTopic());
                        continue;
                    }
                    throw new RuntimeException(String.format("Failed to remove cursor for %s", topicRange.toString()), e);
                }
            }
        }
    }

    private String subscriptionNameFrom(TopicRange topicRange) {
        return topicRange.isFullRange() ? this.subscriptionName : this.subscriptionName + topicRange.getPulsarRange();
    }

    public MessageId getPositionFromSubscription(String topic, MessageId defaultPosition) {
        try {
            TopicStats topicStats = this.admin.topics().getStats(topic);
            if (topicStats.subscriptions.containsKey(this.subscriptionName)) {
                SubscriptionStats subStats = (SubscriptionStats)topicStats.subscriptions.get(this.subscriptionName);
                if (subStats.consumers.size() != 0) {
                    throw new RuntimeException("Subscription been actively used by other consumers, in this situation, the exactly-once semantics cannot be guaranteed.");
                }
                PersistentTopicInternalStats.CursorStats c = (PersistentTopicInternalStats.CursorStats)this.admin.topics().getInternalStats((String)topic).cursors.get(this.subscriptionName);
                String[] ids = c.markDeletePosition.split(":", 2);
                long ledgerId = Long.parseLong(ids[0]);
                long entryIdInMarkDelete = Long.parseLong(ids[1]);
                long entryId = entryIdInMarkDelete == -1L ? -1L : entryIdInMarkDelete + 1L;
                int partitionIdx = TopicName.getPartitionIndex((String)topic);
                return new MessageIdImpl(ledgerId, entryId, partitionIdx);
            }
            this.admin.topics().createSubscription(topic, this.subscriptionName, defaultPosition);
            return defaultPosition;
        }
        catch (PulsarAdminException e) {
            throw new RuntimeException("Failed to get stats for topic " + topic, e);
        }
    }

    public SchemaInfo getPulsarSchema(List<String> topics) throws IncompatibleSchemaException {
        HashSet schemas = new HashSet();
        if (topics.size() > 0) {
            topics.forEach(t -> schemas.add(this.getPulsarSchema((String)t)));
            if (schemas.size() != 1) {
                throw new IncompatibleSchemaException(String.format("Topics to read must share identical schema, however we got %d distinct schemas [%s]", schemas.size(), String.join((CharSequence)",", schemas.stream().map(SchemaInfo::toString).collect(Collectors.toList()))), null);
            }
            return (SchemaInfo)Iterables.getFirst(schemas, (Object)SchemaUtils.emptySchemaInfo());
        }
        return SchemaUtils.emptySchemaInfo();
    }

    public SchemaInfo getPulsarSchema(String topic) {
        try {
            return this.admin.schemas().getSchemaInfo(TopicName.get((String)topic).toString());
        }
        catch (Throwable e) {
            if (e instanceof PulsarAdminException && ((PulsarAdminException)e).getStatusCode() == 404) {
                return BytesSchema.of().getSchemaInfo();
            }
            throw new RuntimeException(String.format("Failed to get schema information for %s", TopicName.get((String)topic).toString()), e);
        }
    }

    public Set<TopicRange> getTopicPartitions() throws PulsarAdminException {
        Set<TopicRange> topics = this.getTopicPartitionsAll();
        return topics.stream().filter(t -> SourceSinkUtils.belongsTo(t, this.numParallelSubtasks, this.indexOfThisSubtask)).collect(Collectors.toSet());
    }

    public Set<TopicRange> getTopicPartitionsAll() throws PulsarAdminException {
        List<TopicRange> topics = this.getTopics();
        HashSet<TopicRange> allTopics = new HashSet<TopicRange>();
        for (TopicRange topic : topics) {
            int partNum = this.admin.topics().getPartitionedTopicMetadata((String)topic.getTopic()).partitions;
            if (partNum == 0) {
                allTopics.add(topic);
                continue;
            }
            for (int i = 0; i < partNum; ++i) {
                TopicRange topicRange = new TopicRange(topic.getTopic() + "-partition-" + i, topic.getPulsarRange());
                allTopics.add(topicRange);
            }
        }
        return allTopics;
    }

    public List<TopicRange> getTopics() throws PulsarAdminException {
        for (Map.Entry<String, String> e : this.caseInsensitiveParams.entrySet()) {
            if (!PulsarOptions.TOPIC_OPTION_KEYS.contains(e.getKey())) continue;
            String key = e.getKey();
            if (key.equals("topic")) {
                String topic = TopicName.get((String)e.getValue()).toString();
                TopicRange topicRange = new TopicRange(topic, this.range.getPulsarRange());
                return Collections.singletonList(topicRange);
            }
            if (key.equals("topics")) {
                return Arrays.asList(e.getValue().split(",")).stream().filter(s -> !s.isEmpty()).map(t -> TopicName.get((String)t).toString()).map(t -> new TopicRange((String)t, this.range.getPulsarRange())).collect(Collectors.toList());
            }
            return this.getTopicsWithPattern(e.getValue()).stream().map(t -> new TopicRange((String)t, this.range.getPulsarRange())).collect(Collectors.toList());
        }
        return null;
    }

    private List<String> getTopicsWithPattern(String topicsPattern) throws PulsarAdminException {
        TopicName dest = TopicName.get((String)topicsPattern);
        List<String> allNonPartitionedTopics = this.getNonPartitionedTopics(dest.getNamespace());
        List allPartitionedTopics = this.admin.topics().getPartitionedTopicList(dest.getNamespace());
        Pattern shortenedTopicsPattern = Pattern.compile(dest.toString().split("://")[1]);
        return Stream.concat(allNonPartitionedTopics.stream(), allPartitionedTopics.stream()).map(t -> TopicName.get((String)t).toString()).filter(t -> shortenedTopicsPattern.matcher(t.split("://")[1]).matches()).collect(Collectors.toList());
    }

    private List<String> getNonPartitionedTopics(String namespace) throws PulsarAdminException {
        return this.admin.topics().getList(namespace).stream().filter(t -> !TopicName.get((String)t).isPartitioned()).collect(Collectors.toList());
    }

    public MessageId getLastMessageId(String topic) {
        try {
            return this.admin.topics().getLastMessageId(topic);
        }
        catch (PulsarAdminException e) {
            throw new RuntimeException(e);
        }
    }

    public void resetCursor(String topic, MessageId messageId) {
        try {
            this.admin.topics().resetCursor(topic, this.subscriptionName, messageId);
        }
        catch (PulsarAdminException e) {
            throw new RuntimeException(e);
        }
    }

    public static class ClosedException
    extends Exception {
    }
}

