/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesService {
    private final PulsarService pulsarService;
    private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
    private final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<TopicName, TopicPolicies>();
    private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap<NamespaceName, AtomicInteger>();
    private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader>> readerCaches = new ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader>>();
    private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<NamespaceName, Boolean>();
    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<TopicName, List<TopicPolicyListener<TopicPolicies>>>();
    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);

    public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
        this.pulsarService = pulsarService;
    }

    @Override
    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.createSystemTopicFactoryIfNeeded();
        SystemTopicClient systemTopicClient = this.namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(), EventType.TOPIC_POLICY);
        CompletableFuture<SystemTopicClient.Writer> writerFuture = systemTopicClient.newWriterAsync();
        writerFuture.whenComplete((writer, ex) -> {
            if (ex != null) {
                result.completeExceptionally((Throwable)ex);
            } else {
                writer.writeAsync(PulsarEvent.builder().actionType(ActionType.UPDATE).eventType(EventType.TOPIC_POLICY).topicPoliciesEvent(TopicPoliciesEvent.builder().domain(topicName.getDomain().toString()).tenant(topicName.getTenant()).namespace(topicName.getNamespaceObject().getLocalName()).topic(topicName.getLocalName()).policies(policies).build()).build()).whenComplete((messageId, e) -> {
                    if (e != null) {
                        result.completeExceptionally((Throwable)e);
                    } else if (messageId != null) {
                        result.complete(null);
                    } else {
                        result.completeExceptionally(new RuntimeException("Got message id is null."));
                    }
                    writer.closeAsync().whenComplete((v, cause) -> {
                        if (cause != null) {
                            log.error("[{}] Close writer error.", (Object)topicName, cause);
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Close writer success.", (Object)topicName);
                        }
                    });
                });
            }
        });
        return result;
    }

    private void notifyListener(Message<PulsarEvent> msg) {
        if (!EventType.TOPIC_POLICY.equals((Object)msg.getValue().getEventType())) {
            return;
        }
        TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
        TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
        if (this.listeners.get(topicName) != null) {
            TopicPolicies policies = event.getPolicies();
            for (TopicPolicyListener<TopicPolicies> listener : this.listeners.get(topicName)) {
                listener.onUpdate(policies);
            }
        }
    }

    @Override
    public boolean cacheIsInitialized(TopicName topicName) {
        return this.policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && this.policyCacheInitMap.get(topicName.getNamespaceObject()) != false;
    }

    @Override
    public TopicPolicies getTopicPolicies(TopicName topicName) throws BrokerServiceException.TopicPoliciesCacheNotInitException {
        if (this.policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && !this.policyCacheInitMap.get(topicName.getNamespaceObject()).booleanValue()) {
            throw new BrokerServiceException.TopicPoliciesCacheNotInitException();
        }
        return this.policiesCache.get(topicName);
    }

    @Override
    public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
        CompletableFuture<TopicPolicies> result = new CompletableFuture<TopicPolicies>();
        this.createSystemTopicFactoryIfNeeded();
        if (this.namespaceEventsSystemTopicFactory == null) {
            result.complete(null);
            return result;
        }
        SystemTopicClient systemTopicClient = this.namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(), EventType.TOPIC_POLICY);
        systemTopicClient.newReaderAsync().thenAccept(r -> this.fetchTopicPoliciesAsyncAndCloseReader((SystemTopicClient.Reader)r, topicName, null, result));
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        NamespaceName namespace = namespaceBundle.getNamespaceObject();
        this.createSystemTopicFactoryIfNeeded();
        SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService = this;
        synchronized (systemTopicBasedTopicPoliciesService) {
            if (this.readerCaches.get(namespace) != null) {
                this.ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
                result.complete(null);
            } else {
                SystemTopicClient systemTopicClient = this.namespaceEventsSystemTopicFactory.createSystemTopic(namespace, EventType.TOPIC_POLICY);
                this.ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
                this.policyCacheInitMap.put(namespace, false);
                CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = systemTopicClient.newReaderAsync();
                this.readerCaches.put(namespace, readerCompletableFuture);
                readerCompletableFuture.whenComplete((reader, ex) -> {
                    if (ex != null) {
                        result.completeExceptionally((Throwable)ex);
                    } else {
                        this.initPolicesCache((SystemTopicClient.Reader)reader, result);
                        this.readMorePolicies((SystemTopicClient.Reader)reader);
                    }
                });
            }
        }
        return result;
    }

    @Override
    public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture;
        NamespaceName namespace = namespaceBundle.getNamespaceObject();
        AtomicInteger bundlesCount = this.ownedBundlesCountPerNamespace.get(namespace);
        if ((bundlesCount == null || bundlesCount.decrementAndGet() <= 0) && (readerCompletableFuture = this.readerCaches.remove(namespace)) != null) {
            readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
            this.ownedBundlesCountPerNamespace.remove(namespace);
            this.policyCacheInitMap.remove(namespace);
            this.policiesCache.entrySet().removeIf(entry -> ((TopicName)entry.getKey()).getNamespaceObject().equals(namespace));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener(){

            @Override
            public void onLoad(NamespaceBundle bundle) {
                SystemTopicBasedTopicPoliciesService.this.addOwnedNamespaceBundleAsync(bundle);
            }

            @Override
            public void unLoad(NamespaceBundle bundle) {
                SystemTopicBasedTopicPoliciesService.this.removeOwnedNamespaceBundleAsync(bundle);
            }

            @Override
            public boolean test(NamespaceBundle namespaceBundle) {
                return true;
            }
        });
    }

    private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture<Void> future) {
        reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
            if (ex != null) {
                future.completeExceptionally((Throwable)ex);
                this.readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
            }
            if (hasMore.booleanValue()) {
                reader.readNextAsync().whenComplete((msg, e) -> {
                    if (e != null) {
                        future.completeExceptionally((Throwable)e);
                        this.readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                    }
                    this.refreshTopicPoliciesCache((Message<PulsarEvent>)msg);
                    this.initPolicesCache(reader, future);
                });
            } else {
                future.complete(null);
                this.policyCacheInitMap.computeIfPresent(reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
            }
        });
    }

    private void readMorePolicies(SystemTopicClient.Reader reader) {
        reader.readNextAsync().whenComplete((msg, ex) -> {
            if (ex == null) {
                this.refreshTopicPoliciesCache((Message<PulsarEvent>)msg);
                this.notifyListener((Message<PulsarEvent>)msg);
                this.readMorePolicies(reader);
            } else if (ex instanceof PulsarClientException.AlreadyClosedException) {
                log.error("Read more topic policies exception, close the read now!", ex);
                NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject();
                this.ownedBundlesCountPerNamespace.remove(namespace);
                this.readerCaches.remove(namespace);
            } else {
                this.readMorePolicies(reader);
            }
        });
    }

    private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
        if (EventType.TOPIC_POLICY.equals((Object)msg.getValue().getEventType())) {
            TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
            this.policiesCache.put(TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()), event.getPolicies());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createSystemTopicFactoryIfNeeded() {
        if (this.namespaceEventsSystemTopicFactory == null) {
            SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService = this;
            synchronized (systemTopicBasedTopicPoliciesService) {
                if (this.namespaceEventsSystemTopicFactory == null) {
                    try {
                        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(this.pulsarService.getClient());
                    }
                    catch (PulsarServerException e) {
                        log.error("Create namespace event system topic factory error.", (Throwable)e);
                    }
                }
            }
        }
    }

    private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader reader, TopicName topicName, TopicPolicies policies, CompletableFuture<TopicPolicies> future) {
        reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
            if (ex != null) {
                future.completeExceptionally((Throwable)ex);
            }
            if (hasMore.booleanValue()) {
                reader.readNextAsync().whenComplete((msg, e) -> {
                    if (e != null) {
                        future.completeExceptionally((Throwable)e);
                    }
                    if (EventType.TOPIC_POLICY.equals((Object)((PulsarEvent)msg.getValue()).getEventType())) {
                        TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent)msg.getValue()).getTopicPoliciesEvent();
                        if (topicName.equals(TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic()))) {
                            this.fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPoliciesEvent.getPolicies(), future);
                        } else {
                            this.fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, policies, future);
                        }
                    }
                });
            } else {
                future.complete(policies);
                reader.closeAsync().whenComplete((v, e) -> {
                    if (e != null) {
                        log.error("[{}] Close reader error.", (Object)topicName, e);
                    }
                });
            }
        });
    }

    @VisibleForTesting
    long getPoliciesCacheSize() {
        return this.policiesCache.size();
    }

    @VisibleForTesting
    long getReaderCacheCount() {
        return this.readerCaches.size();
    }

    @VisibleForTesting
    boolean checkReaderIsCached(NamespaceName namespaceName) {
        return this.readerCaches.get(namespaceName) != null;
    }

    @VisibleForTesting
    Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
        return this.policyCacheInitMap.get(namespaceName);
    }

    @Override
    public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
        this.listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
    }

    @Override
    public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
        this.listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
    }
}

