/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicListWatcher;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PatternMultiTopicsConsumerImpl<T>
extends MultiTopicsConsumerImpl<T>
implements TimerTask {
    private final Pattern topicsPattern;
    private final TopicsChangedListener topicsChangeListener;
    private final CommandGetTopicsOfNamespace.Mode subscriptionMode;
    private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture();
    protected NamespaceName namespaceName;
    private final Backoff recheckPatternTaskBackoff;
    private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
    private volatile Timeout recheckPatternTimeout = null;
    private volatile String topicsHash;
    private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);

    public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, String topicsHash, PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<org.apache.pulsar.shade.client.api.v2.Consumer<T>> subscribeFuture, Schema<T> schema, CommandGetTopicsOfNamespace.Mode subscriptionMode, ConsumerInterceptors<T> interceptors) {
        super(client, conf, executorProvider, subscribeFuture, schema, interceptors, false);
        this.topicsPattern = topicsPattern;
        this.topicsHash = topicsHash;
        this.subscriptionMode = subscriptionMode;
        this.recheckPatternTaskBackoff = new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.SECONDS).create();
        if (this.namespaceName == null) {
            this.namespaceName = PatternMultiTopicsConsumerImpl.getNameSpaceFromPattern(topicsPattern);
        }
        Preconditions.checkArgument(PatternMultiTopicsConsumerImpl.getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));
        this.topicsChangeListener = new PatternTopicsChangedListener();
        this.recheckPatternTimeout = client.timer().newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
        if (subscriptionMode == CommandGetTopicsOfNamespace.Mode.PERSISTENT) {
            long watcherId = client.newTopicListWatcherId();
            new TopicListWatcher(this.topicsChangeListener, client, topicsPattern, watcherId, this.namespaceName, topicsHash, this.watcherFuture, () -> this.recheckTopicsChangeAfterReconnect());
            ((CompletableFuture)this.watcherFuture.thenAccept(__ -> this.recheckPatternTimeout.cancel())).exceptionally(ex -> {
                log.warn("Unable to create topic list watcher. Falling back to only polling for new topics", ex);
                return null;
            });
        } else {
            log.debug("Not creating topic list watcher for subscription mode {}", (Object)subscriptionMode);
            this.watcherFuture.complete(null);
        }
    }

    public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
        return TopicName.get(pattern.pattern()).getNamespaceObject();
    }

    private void recheckTopicsChangeAfterReconnect() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return;
        }
        this.recheckTopicsChange().whenComplete((ignore, ex) -> {
            if (ex != null) {
                log.warn("[{}] Failed to recheck topics change: {}", (Object)this.topic, (Object)ex.getMessage());
                long delayMs = this.recheckPatternTaskBackoff.next();
                this.client.timer().newTimeout(timeout -> this.recheckTopicsChangeAfterReconnect(), delayMs, TimeUnit.MILLISECONDS);
            } else {
                this.recheckPatternTaskBackoff.reset();
            }
        });
    }

    @Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled()) {
            return;
        }
        ((CompletableFuture)this.recheckTopicsChange().exceptionally(ex -> {
            log.warn("[{}] Failed to recheck topics change: {}", (Object)this.topic, (Object)ex.getMessage());
            return null;
        })).thenAccept(__ -> {
            this.recheckPatternTimeout = this.client.timer().newTimeout(this, Math.max(1, this.conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
        });
    }

    private CompletableFuture<Void> recheckTopicsChange() {
        String pattern = this.topicsPattern.pattern();
        int epoch = this.recheckPatternEpoch.incrementAndGet();
        return this.client.getLookup().getTopicsUnderNamespace(this.namespaceName, this.subscriptionMode, pattern, this.topicsHash).thenCompose(getTopicsResult -> {
            Backoff backoff = this.recheckPatternTaskBackoff;
            synchronized (backoff) {
                if (this.recheckPatternEpoch.get() > epoch) {
                    return CompletableFuture.completedFuture(null);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}", new Object[]{this.namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), getTopicsResult.isFiltered()});
                    getTopicsResult.getTopics().forEach(topicName -> log.debug("Get topics under namespace {}, topic: {}", (Object)this.namespaceName, topicName));
                }
                ArrayList<String> oldTopics = new ArrayList<String>(this.getPartitionedTopics());
                for (String partition : this.getPartitions()) {
                    TopicName topicName2 = TopicName.get(partition);
                    if (topicName2.isPartitioned() && oldTopics.contains(topicName2.getPartitionedTopicName())) continue;
                    oldTopics.add(partition);
                }
                return PatternMultiTopicsConsumerImpl.updateSubscriptions(this.topicsPattern, this::setTopicsHash, getTopicsResult, this.topicsChangeListener, oldTopics);
            }
        });
    }

    static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern, Consumer<String> topicsHashSetter, GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangedListener, List<String> oldTopics) {
        topicsHashSetter.accept(getTopicsResult.getTopicsHash());
        if (!getTopicsResult.isChanged()) {
            return CompletableFuture.completedFuture(null);
        }
        List<String> newTopics = getTopicsResult.isFiltered() ? getTopicsResult.getTopics() : TopicList.filterTopics(getTopicsResult.getTopics(), topicsPattern);
        ArrayList<CompletableFuture<Void>> listenersCallback = new ArrayList<CompletableFuture<Void>>(2);
        listenersCallback.add(topicsChangedListener.onTopicsAdded(TopicList.minus(newTopics, oldTopics)));
        listenersCallback.add(topicsChangedListener.onTopicsRemoved(TopicList.minus(oldTopics, newTopics)));
        return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
    }

    public Pattern getPattern() {
        return this.topicsPattern;
    }

    @VisibleForTesting
    void setTopicsHash(String topicsHash) {
        this.topicsHash = topicsHash;
    }

    @Override
    @SuppressFBWarnings
    public CompletableFuture<Void> closeAsync() {
        TopicListWatcher watcher;
        Timeout timeout = this.recheckPatternTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.recheckPatternTimeout = null;
        }
        ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<CompletableFuture<Void>>(2);
        if (this.watcherFuture.isDone() && !this.watcherFuture.isCompletedExceptionally() && (watcher = (TopicListWatcher)this.watcherFuture.getNow(null)) != null) {
            closeFutures.add(watcher.closeAsync());
        }
        closeFutures.add(super.closeAsync());
        return FutureUtil.waitForAll(closeFutures);
    }

    @VisibleForTesting
    Timeout getRecheckPatternTimeout() {
        return this.recheckPatternTimeout;
    }

    private class PatternTopicsChangedListener
    implements TopicsChangedListener {
        private PatternTopicsChangedListener() {
        }

        @Override
        public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
            CompletableFuture<Void> removeFuture = new CompletableFuture<Void>();
            if (removedTopics.isEmpty()) {
                removeFuture.complete(null);
                return removeFuture;
            }
            ArrayList futures = Lists.newArrayListWithExpectedSize(PatternMultiTopicsConsumerImpl.this.partitionedTopics.size());
            removedTopics.stream().forEach(topic -> futures.add(PatternMultiTopicsConsumerImpl.this.removeConsumerAsync((String)topic)));
            ((CompletableFuture)FutureUtil.waitForAll(futures).thenAccept(finalFuture -> removeFuture.complete(null))).exceptionally(ex -> {
                log.warn("[{}] Failed to unsubscribe from topics: {}", (Object)PatternMultiTopicsConsumerImpl.this.topic, (Object)ex.getMessage());
                removeFuture.completeExceptionally((Throwable)ex);
                return null;
            });
            return removeFuture;
        }

        @Override
        public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
            CompletableFuture<Void> addFuture = new CompletableFuture<Void>();
            if (addedTopics.isEmpty()) {
                addFuture.complete(null);
                return addFuture;
            }
            Set<String> addTopicPartitionedName = addedTopics.stream().map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName()).collect(Collectors.toSet());
            ArrayList futures = Lists.newArrayListWithExpectedSize(PatternMultiTopicsConsumerImpl.this.partitionedTopics.size());
            addTopicPartitionedName.forEach(partitionedTopic -> futures.add(PatternMultiTopicsConsumerImpl.this.subscribeAsync((String)partitionedTopic, false)));
            ((CompletableFuture)FutureUtil.waitForAll(futures).thenAccept(finalFuture -> addFuture.complete(null))).exceptionally(ex -> {
                log.warn("[{}] Failed to subscribe to topics: {}", (Object)PatternMultiTopicsConsumerImpl.this.topic, (Object)ex.getMessage());
                addFuture.completeExceptionally((Throwable)ex);
                return null;
            });
            return addFuture;
        }
    }

    static interface TopicsChangedListener {
        public CompletableFuture<Void> onTopicsRemoved(Collection<String> var1);

        public CompletableFuture<Void> onTopicsAdded(Collection<String> var1);
    }
}

