/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.carrotsearch.hppc.ObjectObjectHashMap;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTopic
extends AbstractTopic
implements Topic,
AsyncCallbacks.AddEntryCallback {
    protected final ManagedLedger ledger;
    private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, Replicator> replicators;
    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
    private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
    private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60L;
    public boolean msgChunkPublished;
    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
    private final long backloggedCursorThresholdEntries;
    public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
    protected final MessageDeduplication messageDeduplication;
    private static final long COMPACTION_NEVER_RUN = -4273917950L;
    private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(-4273917950L);
    private final CompactedTopic compactedTopic;
    private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture((MessageIdImpl)MessageId.earliest);
    private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
    private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>(){

        @Override
        protected TopicStatsHelper initialValue() {
            return new TopicStatsHelper();
        }
    };
    private final AtomicLong pendingWriteOps = new AtomicLong(0L);
    private volatile double lastUpdatedAvgPublishRateInMsg = 0.0;
    private volatile double lastUpdatedAvgPublishRateInByte = 0.0;
    private volatile int maxUnackedMessagesOnSubscriptionApplied;
    private volatile boolean isClosingOrDeleting = false;
    private ScheduledFuture<?> fencedTopicMonitoringTask = null;
    protected final TransactionBuffer transactionBuffer;
    private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
    private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
    private long lastDataMessagePublishedTimestamp = 0L;
    private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
        super(topic, brokerService);
        this.ledger = ledger;
        this.subscriptions = new ConcurrentOpenHashMap(16, 1);
        this.replicators = new ConcurrentOpenHashMap(16, 1);
        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
        this.initializeRateLimiterIfNeeded(Optional.empty());
        this.registerTopicPolicyListener();
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        for (ManagedCursor cursor : ledger.getCursors()) {
            if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME) || cursor.getName().startsWith(this.replicatorPrefix)) continue;
            String subscriptionName = Codec.decode(cursor.getName());
            this.subscriptions.put(subscriptionName, this.createPersistentSubscription(subscriptionName, cursor, PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null));
            this.subscriptions.get(subscriptionName).deactivateCursor();
        }
        this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
        if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
            this.topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
        }
        this.checkReplicatedSubscriptionControllerState();
        TopicName topicName = TopicName.get(topic);
        this.transactionBuffer = brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() && !EventsTopicNames.checkTopicIsEventsNames(topicName) ? brokerService.getPulsar().getTransactionBufferProvider().newTransactionBuffer(this) : new TransactionBufferDisable();
        this.transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)ledger.getLastConfirmedEntry());
    }

    @Override
    public CompletableFuture<Void> initialize() {
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        futures.add(this.initTopicPolicy());
        for (ManagedCursor cursor : this.ledger.getCursors()) {
            if (!cursor.getName().startsWith(this.replicatorPrefix)) continue;
            String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
            String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
            futures.add(this.addReplicationCluster(remoteCluster, cursor, localCluster));
        }
        return FutureUtil.waitForAll(futures).thenCompose(__ -> ((CompletableFuture)this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenAccept(optPolicies -> {
            if (!optPolicies.isPresent()) {
                this.isEncryptionRequired = false;
                return;
            }
            Policies policies = (Policies)optPolicies.get();
            this.updateTopicPolicyByNamespacePolicy(policies);
            this.isEncryptionRequired = policies.encryption_required;
            this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
            this.schemaValidationEnforced = policies.schema_validation_enforced;
        })).exceptionally(ex -> {
            log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", (Object)this.topic, (Object)ex.getMessage());
            this.isEncryptionRequired = false;
            return null;
        }));
    }

    @VisibleForTesting
    PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) {
        super(topic, brokerService);
        this.ledger = ledger;
        this.messageDeduplication = messageDeduplication;
        this.subscriptions = new ConcurrentOpenHashMap(16, 1);
        this.replicators = new ConcurrentOpenHashMap(16, 1);
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
        this.transactionBuffer = brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled() ? brokerService.getPulsar().getTransactionBufferProvider().newTransactionBuffer(this) : new TransactionBufferDisable();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
        Optional<DispatchRateLimiter> optional = this.dispatchRateLimiter;
        synchronized (optional) {
            if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateNeeded(this.brokerService, policies, this.topic, DispatchRateLimiter.Type.TOPIC)) {
                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, DispatchRateLimiter.Type.TOPIC));
            }
            boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(this.brokerService, policies, this.topic);
            if (!this.subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
            } else if (!isDispatchRateNeeded) {
                this.subscribeRateLimiter = Optional.empty();
            }
            this.subscriptions.forEach((name, subscription) -> {
                Dispatcher dispatcher = subscription.getDispatcher();
                if (dispatcher != null) {
                    dispatcher.initializeDispatchRateLimiterIfNeeded(policies);
                }
            });
            this.replicators.forEach((name, replicator) -> replicator.initializeDispatchRateLimiterIfNeeded(policies));
        }
    }

    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated, Map<String, String> subscriptionProperties) {
        Preconditions.checkNotNull(this.compactedTopic);
        if (subscriptionName.equals("__compaction")) {
            return new CompactorSubscription(this, this.compactedTopic, subscriptionName, cursor);
        }
        return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties);
    }

    @Override
    public void publishMessage(ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
        this.pendingWriteOps.incrementAndGet();
        if (this.isFenced) {
            publishContext.completed(new BrokerServiceException.TopicFencedException("fenced"), -1L, -1L);
            this.decrementPendingWriteOpsAndCheck();
            return;
        }
        if (this.isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
            publishContext.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            this.decrementPendingWriteOpsAndCheck();
            return;
        }
        MessageDeduplication.MessageDupStatus status = this.messageDeduplication.isDuplicate(publishContext, headersAndPayload);
        switch (status) {
            case NotDup: {
                this.asyncAddEntry(headersAndPayload, publishContext);
                break;
            }
            case Dup: {
                publishContext.completed(null, -1L, -1L);
                this.decrementPendingWriteOpsAndCheck();
                break;
            }
            default: {
                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1L, -1L);
                this.decrementPendingWriteOpsAndCheck();
            }
        }
    }

    private void asyncAddEntry(ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
        if (this.brokerService.isBrokerEntryMetadataEnabled()) {
            this.ledger.asyncAddEntry(headersAndPayload, (int)publishContext.getNumberOfMessages(), this, publishContext);
        } else {
            this.ledger.asyncAddEntry(headersAndPayload, (AsyncCallbacks.AddEntryCallback)this, (Object)publishContext);
        }
    }

    public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
        if (this.ledger instanceof ManagedLedgerImpl) {
            ((ManagedLedgerImpl)this.ledger).asyncReadEntry(position, callback, ctx);
        } else {
            callback.readEntryFailed(new ManagedLedgerException("Unexpected managedledger implementation, doesn't support direct read entry operation."), ctx);
        }
    }

    public PositionImpl getPositionAfterN(PositionImpl startPosition, long n) throws ManagedLedgerException {
        if (this.ledger instanceof ManagedLedgerImpl) {
            return ((ManagedLedgerImpl)this.ledger).getPositionAfterN(startPosition, n, ManagedLedgerImpl.PositionBound.startExcluded);
        }
        throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support getPositionAfterN operation.");
    }

    public PositionImpl getFirstPosition() throws ManagedLedgerException {
        if (this.ledger instanceof ManagedLedgerImpl) {
            return ((ManagedLedgerImpl)this.ledger).getFirstPosition();
        }
        throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support getFirstPosition operation.");
    }

    public long getNumberOfEntries() {
        return this.ledger.getNumberOfEntries();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void decrementPendingWriteOpsAndCheck() {
        long pending = this.pendingWriteOps.decrementAndGet();
        if (pending == 0L && this.isFenced && !this.isClosingOrDeleting) {
            PersistentTopic persistentTopic = this;
            synchronized (persistentTopic) {
                if (this.isFenced && !this.isClosingOrDeleting) {
                    this.messageDeduplication.resetHighestSequenceIdPushed();
                    log.info("[{}] Un-fencing topic...", (Object)this.topic);
                    this.ledger.readyToCreateNewLedger();
                    this.unfence();
                }
            }
        }
    }

    @Override
    public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
        Topic.PublishContext publishContext = (Topic.PublishContext)ctx;
        PositionImpl position = (PositionImpl)pos;
        this.messageDeduplication.recordMessagePersisted(publishContext, position);
        if (!publishContext.isMarkerMessage()) {
            this.lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
        }
        this.transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)this.ledger.getLastConfirmedEntry());
        publishContext.setMetadataFromEntryData(entryData);
        publishContext.completed(null, position.getLedgerId(), position.getEntryId());
        this.decrementPendingWriteOpsAndCheck();
    }

    @Override
    public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
        if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            this.close();
        } else {
            CompletableFuture<Object> disconnectProducersFuture;
            this.fence();
            if (this.producers.size() > 0) {
                ArrayList futures = Lists.newArrayList();
                this.producers.forEach((__, producer) -> futures.add(producer.disconnect()));
                disconnectProducersFuture = FutureUtil.waitForAll(futures);
            } else {
                disconnectProducersFuture = CompletableFuture.completedFuture(null);
            }
            disconnectProducersFuture.handle((aVoid, throwable) -> {
                this.decrementPendingWriteOpsAndCheck();
                return null;
            });
            Topic.PublishContext callback = (Topic.PublishContext)ctx;
            if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Failed to persist msg in store: {}", (Object)this.topic, (Object)exception.getMessage());
                }
                callback.completed(new BrokerServiceException.TopicClosedException(exception), -1L, -1L);
                return;
            }
            log.warn("[{}] Failed to persist msg in store: {}", (Object)this.topic, (Object)exception.getMessage());
            if (exception instanceof ManagedLedgerException.ManagedLedgerTerminatedException) {
                callback.completed(new BrokerServiceException.TopicTerminatedException(exception), -1L, -1L);
            } else {
                callback.completed(new BrokerServiceException.PersistenceException(exception), -1L, -1L);
            }
        }
    }

    @Override
    public CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture) {
        return super.addProducer(producer, producerQueuedFuture).thenCompose(topicEpoch -> {
            this.messageDeduplication.producerAdded(producer.getProducerName());
            return this.startReplProducers().thenApply(__ -> topicEpoch);
        });
    }

    @Override
    public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
        return this.getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled);
    }

    @Override
    protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
        long newEpoch = currentEpoch.orElse(-1L) + 1L;
        return this.setTopicEpoch(newEpoch);
    }

    @Override
    protected CompletableFuture<Long> setTopicEpoch(final long newEpoch) {
        final CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new AsyncCallbacks.UpdatePropertiesCallback(){

            @Override
            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                log.info("[{}] Updated topic epoch to {}", (Object)PersistentTopic.this.getName(), (Object)newEpoch);
                future.complete(newEpoch);
            }

            @Override
            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed to update topic epoch to {}: {}", new Object[]{PersistentTopic.this.getName(), newEpoch, exception.getMessage()});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    private boolean hasRemoteProducers() {
        if (this.producers.isEmpty()) {
            return false;
        }
        for (Producer producer : this.producers.values()) {
            if (!producer.isRemote()) continue;
            return true;
        }
        return false;
    }

    public CompletableFuture<Void> startReplProducers() {
        return ((CompletableFuture)this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenAccept(optPolicies -> {
            if (optPolicies.isPresent()) {
                if (((Policies)optPolicies.get()).replication_clusters != null) {
                    TreeSet<String> configuredClusters = Sets.newTreeSet(((Policies)optPolicies.get()).replication_clusters);
                    this.replicators.forEach((region, replicator) -> {
                        if (configuredClusters.contains(region)) {
                            replicator.startProducer();
                        }
                    });
                }
            } else {
                this.replicators.forEach((region, replicator) -> replicator.startProducer());
            }
        })).exceptionally(ex -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies while starting repl-producers {}", (Object)this.topic, (Object)ex.getMessage());
            }
            this.replicators.forEach((region, replicator) -> replicator.startProducer());
            return null;
        });
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
        return FutureUtil.waitForAll(closeFutures);
    }

    private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
        return FutureUtil.waitForAll(closeFutures);
    }

    @Override
    protected void handleProducerRemoved(Producer producer) {
        super.handleProducerRemoved(producer);
        this.messageDeduplication.producerRemoved(producer.getProducerName());
    }

    @Override
    public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
        return this.internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(), option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()));
    }

    private CompletableFuture<Consumer> internalSubscribe(TransportCnx cnx, String subscriptionName, long consumerId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, CommandSubscribe.InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, Map<String, String> subscriptionProperties) {
        if (readCompacted && subType != CommandSubscribe.SubType.Failover && subType != CommandSubscribe.SubType.Exclusive) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
        }
        return this.brokerService.checkTopicNsOwnership(this.getName()).thenCompose(__ -> {
            Object consumer;
            boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
            if (replicatedSubscriptionState && !this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
                log.warn("[{}] Replicated Subscription is disabled by broker.", (Object)this.getName());
                replicatedSubscriptionState = false;
            }
            if (subType == CommandSubscribe.SubType.Key_Shared && !this.brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
                return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Key_Shared subscription is disabled by broker."));
            }
            try {
                if (!this.topic.endsWith("__change_events") && !this.checkSubscriptionTypesEnable(subType)) {
                    return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Topic[{" + this.topic + "}] doesn't support " + subType.name() + " sub type!"));
                }
            }
            catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }
            if (StringUtils.isBlank(subscriptionName)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Empty subscription name", (Object)this.topic);
                }
                return FutureUtil.failedFuture(new BrokerServiceException.NamingException("Empty subscription name"));
            }
            if (this.hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer doesn't support batch-message {}", (Object)this.topic, (Object)subscriptionName);
                }
                return FutureUtil.failedFuture(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
            }
            if (subscriptionName.startsWith(this.replicatorPrefix) || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) {
                log.warn("[{}] Failed to create subscription for {}", (Object)this.topic, (Object)subscriptionName);
                return FutureUtil.failedFuture(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
            }
            if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
                consumer = new SubscribeRateLimiter.ConsumerIdentifier(cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
                if (!(!this.subscribeRateLimiter.isPresent() || this.subscribeRateLimiter.get().subscribeAvailable((SubscribeRateLimiter.ConsumerIdentifier)consumer) && this.subscribeRateLimiter.get().tryAcquire((SubscribeRateLimiter.ConsumerIdentifier)consumer))) {
                    log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}", new Object[]{this.topic, subscriptionName, consumer, this.subscribeRateLimiter.get().getSubscribeRate(), this.subscribeRateLimiter.get().getAvailableSubscribeRateLimit((SubscribeRateLimiter.ConsumerIdentifier)consumer)});
                    return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
                }
            }
            this.lock.readLock().lock();
            try {
                if (this.isFenced) {
                    log.warn("[{}] Attempting to subscribe to a fenced topic", (Object)this.topic);
                    consumer = FutureUtil.failedFuture(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                    return consumer;
                }
                this.handleConsumerAdded(subscriptionName, consumerName);
            }
            finally {
                this.lock.readLock().unlock();
            }
            CompletableFuture<Subscription> subscriptionFuture = isDurable ? this.getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState, subscriptionProperties) : this.getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted);
            CompletionStage future = subscriptionFuture.thenCompose(subscription -> {
                Consumer consumer = new Consumer((Subscription)subscription, subType, this.topic, consumerId, priorityLevel, consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, startMessageId);
                return this.addConsumerToSubscription((Subscription)subscription, consumer).thenCompose(v -> {
                    this.checkBackloggedCursors();
                    if (!cnx.isActive()) {
                        try {
                            consumer.close();
                        }
                        catch (BrokerServiceException e) {
                            if (e instanceof BrokerServiceException.ConsumerBusyException) {
                                log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
                            } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                                log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, e.getMessage()});
                            }
                            this.decrementUsageCount();
                            return FutureUtil.failedFuture(e);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, subscriptionName, consumer.consumerName(), this.currentUsageCount()});
                        }
                        this.decrementUsageCount();
                        return FutureUtil.failedFuture(new BrokerServiceException("Connection was closed while the opening the cursor "));
                    }
                    this.checkReplicatedSubscriptionControllerState();
                    log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, subscriptionName, consumerId});
                    return CompletableFuture.completedFuture(consumer);
                });
            });
            ((CompletableFuture)future).exceptionally(ex -> {
                this.decrementUsageCount();
                if (ex.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                    log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
                    Consumer consumer = null;
                    try {
                        Consumer consumer2 = consumer = subscriptionFuture.isDone() ? this.getActiveConsumer((Subscription)subscriptionFuture.get()) : null;
                        if (consumer != null && !consumer.cnx().isActive()) {
                            consumer.close();
                        }
                    }
                    catch (Exception be) {
                        log.error("Failed to clean up consumer on closed connection {}, {}", (Object)consumer, (Object)be.getMessage());
                    }
                } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionBusyException) {
                    log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, ex.getMessage()});
                } else {
                    log.error("[{}] Failed to create subscription: {}", new Object[]{this.topic, subscriptionName, ex});
                }
                return null;
            });
            return future;
        });
    }

    @Override
    public CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, CommandSubscribe.InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta) {
        return this.internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionStateArg, keySharedMeta, null);
    }

    private CompletableFuture<Subscription> getDurableSubscription(final String subscriptionName, CommandSubscribe.InitialPosition initialPosition, final long startMessageRollbackDurationSec, final boolean replicated, final Map<String, String> subscriptionProperties) {
        final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<Subscription>();
        if (this.checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
            subscriptionFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Exceed the maximum number of subscriptions of the topic: " + this.topic));
            return subscriptionFuture;
        }
        Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
        this.ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new AsyncCallbacks.OpenCursorCallback(){

            @Override
            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                PersistentSubscription subscription;
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Opened cursor", (Object)PersistentTopic.this.topic, (Object)subscriptionName);
                }
                if ((subscription = (PersistentSubscription)PersistentTopic.this.subscriptions.get(subscriptionName)) == null) {
                    subscription = PersistentTopic.this.subscriptions.computeIfAbsent(subscriptionName, name -> PersistentTopic.this.createPersistentSubscription(subscriptionName, cursor, replicated, subscriptionProperties));
                } else if (subscription.getCursor() != null && !subscription.getCursor().isDurable()) {
                    subscriptionFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("NonDurable subscription with the same name already exists."));
                    return;
                }
                if (MapUtils.isEmpty(subscription.getSubscriptionProperties()) && MapUtils.isNotEmpty(subscriptionProperties)) {
                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
                }
                if (replicated && !subscription.isReplicated()) {
                    subscription.setReplicated(replicated);
                }
                if (startMessageRollbackDurationSec > 0L) {
                    PersistentTopic.this.resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec);
                } else {
                    subscriptionFuture.complete(subscription);
                }
            }

            @Override
            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed to create subscription for {}: {}", new Object[]{PersistentTopic.this.topic, subscriptionName, exception.getMessage()});
                PersistentTopic.this.decrementUsageCount();
                subscriptionFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
                if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                    PersistentTopic.this.close();
                }
            }
        }, null);
        return subscriptionFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName, MessageId startMessageId, CommandSubscribe.InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean isReadCompacted) {
        log.info("[{}][{}] Creating non-durable subscription at msg id {}", new Object[]{this.topic, subscriptionName, startMessageId});
        CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<Subscription>();
        if (this.checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
            subscriptionFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Exceed the maximum number of subscriptions of the topic: " + this.topic));
            return subscriptionFuture;
        }
        ManagedLedger managedLedger = this.ledger;
        synchronized (managedLedger) {
            PersistentSubscription subscription = this.subscriptions.get(subscriptionName);
            if (subscription == null) {
                MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl)startMessageId : (MessageIdImpl)MessageId.latest;
                long ledgerId = msgId.getLedgerId();
                long entryId = msgId.getEntryId();
                if (ledgerId >= 0L && entryId >= 0L && msgId instanceof BatchMessageIdImpl) {
                    entryId = msgId.getEntryId() - 1L;
                }
                PositionImpl startPosition = new PositionImpl(ledgerId, entryId);
                ManagedCursor cursor = null;
                try {
                    cursor = this.ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition, isReadCompacted);
                }
                catch (ManagedLedgerException e) {
                    return FutureUtil.failedFuture(e);
                }
                subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
                this.subscriptions.put(subscriptionName, subscription);
            } else if (subscription.getCursor() != null && subscription.getCursor().isDurable()) {
                return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Durable subscription with the same name already exists."));
            }
            if (startMessageRollbackDurationSec > 0L) {
                this.resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec);
                return subscriptionFuture;
            }
            return CompletableFuture.completedFuture(subscription);
        }
    }

    private void resetSubscriptionCursor(Subscription subscription, CompletableFuture<Subscription> subscriptionFuture, long startMessageRollbackDurationSec) {
        long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
        Subscription finalSubscription = subscription;
        subscription.resetCursor(timestamp).handle((s, ex) -> {
            if (ex != null) {
                log.warn("[{}] Failed to reset cursor {} position at timestamp {}, caused by {}", new Object[]{this.topic, subscription.getName(), startMessageRollbackDurationSec, ex.getMessage()});
            }
            subscriptionFuture.complete(finalSubscription);
            return null;
        });
    }

    @Override
    public CompletableFuture<Subscription> createSubscription(String subscriptionName, CommandSubscribe.InitialPosition initialPosition, boolean replicateSubscriptionState) {
        return this.getDurableSubscription(subscriptionName, initialPosition, 0L, replicateSubscriptionState, null);
    }

    @Override
    public CompletableFuture<Void> unsubscribe(final String subscriptionName) {
        final CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        if (this.brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
            this.getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(this.topic, Codec.encode(subscriptionName))).getPersistenceNamingEncoding(), new AsyncCallbacks.DeleteLedgerCallback(){

                @Override
                public void deleteLedgerComplete(Object ctx) {
                    PersistentTopic.this.asyncDeleteCursor(subscriptionName, unsubscribeFuture);
                }

                @Override
                public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                    if (exception instanceof ManagedLedgerException.MetadataNotFoundException) {
                        PersistentTopic.this.asyncDeleteCursor(subscriptionName, unsubscribeFuture);
                        return;
                    }
                    unsubscribeFuture.completeExceptionally(exception);
                    log.error("[{}][{}] Error deleting subscription pending ack store", new Object[]{PersistentTopic.this.topic, subscriptionName, exception});
                }
            }, null);
        } else {
            this.asyncDeleteCursor(subscriptionName, unsubscribeFuture);
        }
        return unsubscribeFuture;
    }

    private void asyncDeleteCursor(final String subscriptionName, final CompletableFuture<Void> unsubscribeFuture) {
        this.ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new AsyncCallbacks.DeleteCursorCallback(){

            @Override
            public void deleteCursorComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Cursor deleted successfully", (Object)PersistentTopic.this.topic, (Object)subscriptionName);
                }
                PersistentTopic.this.removeSubscription(subscriptionName);
                unsubscribeFuture.complete(null);
                PersistentTopic.this.lastActive = System.nanoTime();
            }

            @Override
            public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Error deleting cursor for subscription", new Object[]{PersistentTopic.this.topic, subscriptionName, exception});
                }
                if (exception instanceof ManagedLedgerException.ManagedLedgerNotFoundException) {
                    unsubscribeFuture.complete(null);
                    PersistentTopic.this.lastActive = System.nanoTime();
                    return;
                }
                unsubscribeFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
            }
        }, null);
    }

    void removeSubscription(String subscriptionName) {
        PersistentSubscription sub = this.subscriptions.remove(subscriptionName);
        SubscriptionStatsImpl stats = sub.getStats(false, false, false);
        this.bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
        this.msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false, false, false);
    }

    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean deleteSchema) {
        return this.delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(false, false, true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean closeIfClientsConnected, boolean deleteSchema) {
        final CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        this.lock.writeLock().lock();
        try {
            if (this.isClosingOrDeleting) {
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                CompletableFuture<Void> completableFuture = FutureUtil.failedFuture(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                return completableFuture;
            }
            if (failIfHasSubscriptions && !this.subscriptions.isEmpty()) {
                CompletableFuture<Void> completableFuture = FutureUtil.failedFuture(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                return completableFuture;
            }
            if (failIfHasBacklogs && this.hasBacklogs()) {
                CompletableFuture<Void> completableFuture = FutureUtil.failedFuture(new BrokerServiceException.TopicBusyException("Topic has subscriptions did not catch up"));
                return completableFuture;
            }
            this.fenceTopicToCloseOrDelete();
            CompletableFuture closeClientFuture = new CompletableFuture();
            if (closeIfClientsConnected) {
                ArrayList futures = Lists.newArrayList();
                this.replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
                this.producers.values().forEach(producer -> futures.add(producer.disconnect()));
                this.subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
                ((CompletableFuture)FutureUtil.waitForAll(futures).thenRun(() -> closeClientFuture.complete(null))).exceptionally(ex -> {
                    log.error("[{}] Error closing clients", (Object)this.topic, ex);
                    this.unfenceTopicToResume();
                    closeClientFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                closeClientFuture.complete(null);
            }
            ((CompletableFuture)closeClientFuture.thenAccept(delete -> {
                if (this.currentUsageCount() == 0L || closeIfClientsConnected && !failIfHasSubscriptions) {
                    CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<Void>();
                    this.brokerService.deleteTopicAuthenticationWithRetry(this.topic, deleteTopicAuthenticationFuture, 5);
                    ((CompletableFuture)((CompletableFuture)((CompletableFuture)deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? this.deleteSchema() : CompletableFuture.completedFuture(null))).thenAccept(__ -> this.deleteTopicPolicies())).thenCompose(__ -> this.transactionBuffer.clearSnapshot())).whenComplete((v, ex) -> {
                        if (ex != null) {
                            log.error("[{}] Error deleting topic", (Object)this.topic, ex);
                            this.unfenceTopicToResume();
                            deleteFuture.completeExceptionally((Throwable)ex);
                        } else {
                            ArrayList subsDeleteFutures = new ArrayList();
                            this.subscriptions.forEach((sub, p) -> subsDeleteFutures.add(this.unsubscribe((String)sub)));
                            FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                if (e != null) {
                                    log.error("[{}] Error deleting topic", (Object)this.topic, e);
                                    this.unfenceTopicToResume();
                                    deleteFuture.completeExceptionally((Throwable)e);
                                } else {
                                    this.ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

                                        @Override
                                        public void deleteLedgerComplete(Object ctx) {
                                            PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                                            PersistentTopic.this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
                                            PersistentTopic.this.subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
                                            PersistentTopic.this.unregisterTopicPolicyListener();
                                            log.info("[{}] Topic deleted", (Object)PersistentTopic.this.topic);
                                            deleteFuture.complete(null);
                                        }

                                        @Override
                                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                                            if (exception.getCause() instanceof MetadataStoreException.NotFoundException) {
                                                log.info("[{}] Topic is already deleted {}", (Object)PersistentTopic.this.topic, (Object)exception.getMessage());
                                                this.deleteLedgerComplete(ctx);
                                            } else {
                                                PersistentTopic.this.unfenceTopicToResume();
                                                log.error("[{}] Error deleting topic", (Object)PersistentTopic.this.topic, (Object)exception);
                                                deleteFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
                                            }
                                        }
                                    }, null);
                                }
                            });
                        }
                    });
                } else {
                    this.unfenceTopicToResume();
                    deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + this.currentUsageCount() + " connected producers/consumers"));
                }
            })).exceptionally(ex -> {
                this.unfenceTopicToResume();
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return deleteFuture;
    }

    public CompletableFuture<Void> close() {
        return this.close(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
        CompletableFuture<Void> closeFuture;
        block8: {
            closeFuture = new CompletableFuture<Void>();
            this.lock.writeLock().lock();
            try {
                if (!this.isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
                    this.fenceTopicToCloseOrDelete();
                    break block8;
                }
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                closeFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = closeFuture;
                return completableFuture;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
        futures.add(this.transactionBuffer.closeAsync());
        this.replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
        this.producers.values().forEach(producer -> futures.add(producer.disconnect()));
        if (this.topicPublishRateLimiter != null) {
            try {
                this.topicPublishRateLimiter.close();
            }
            catch (Exception e) {
                log.warn("Error closing topicPublishRateLimiter for topic {}", (Object)this.topic, (Object)e);
            }
        }
        this.subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
        if (this.resourceGroupPublishLimiter != null) {
            this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
        }
        CompletableFuture<Object> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll(futures);
        ((CompletableFuture)clientCloseFuture.thenRun(() -> this.ledger.asyncClose(new AsyncCallbacks.CloseCallback(){

            @Override
            public void closeComplete(Object ctx) {
                ((CompletableFuture)PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic).thenRun(() -> {
                    PersistentTopic.this.replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
                    PersistentTopic.this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
                    PersistentTopic.this.subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
                    PersistentTopic.this.unregisterTopicPolicyListener();
                    log.info("[{}] Topic closed", (Object)PersistentTopic.this.topic);
                    closeFuture.complete(null);
                })).exceptionally(ex -> {
                    closeFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }

            @Override
            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}] Failed to close managed ledger, proceeding anyway.", (Object)PersistentTopic.this.topic, (Object)exception);
                PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                closeFuture.complete(null);
            }
        }, null))).exceptionally(exception -> {
            log.error("[{}] Error closing topic", (Object)this.topic, exception);
            this.unfenceTopicToResume();
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    @VisibleForTesting
    CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.checkReplication().thenAccept(res -> {
            log.info("[{}] Policies updated successfully", (Object)this.topic);
            result.complete(null);
        })).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), 60L, th});
            if (!(th.getCause() instanceof BrokerServiceException.TopicFencedException)) {
                this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, 60L, TimeUnit.SECONDS);
            }
            result.completeExceptionally((Throwable)th);
            return null;
        });
        return result;
    }

    public CompletableFuture<Void> checkDeduplicationStatus() {
        return this.messageDeduplication.checkStatus();
    }

    private CompletableFuture<Void> checkPersistencePolicies() {
        TopicName topicName = TopicName.get(this.topic);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
            this.ledger.setConfig((ManagedLedgerConfig)config);
            future.complete(null);
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to update persistence-policies {}", (Object)this.topic, (Object)ex.getMessage());
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> checkReplication() {
        TopicName name = TopicName.get(this.topic);
        if (!name.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        NamespaceName heartbeatNamespace = this.brokerService.pulsar().getHeartbeatNamespaceV2();
        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", (Object)name);
        }
        List<String> configuredClusters = this.topicPolicies.getReplicationClusters().get();
        int newMessageTTLinSeconds = this.topicPolicies.getMessageTTLInSeconds().get();
        String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        if (TopicName.get(this.topic).isGlobal() && !configuredClusters.contains(localCluster)) {
            log.info("Deleting topic [{}] because local cluster is not part of  global namespace repl list {}", (Object)this.topic, configuredClusters);
            return this.deleteForcefully();
        }
        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
        for (String cluster2 : configuredClusters) {
            if (cluster2.equals(localCluster) || this.replicators.containsKey(cluster2)) continue;
            futures.add(this.startReplicator(cluster2));
        }
        this.replicators.forEach((cluster, replicator) -> {
            ((PersistentReplicator)replicator).updateMessageTTL(newMessageTTLinSeconds);
            if (!cluster.equals(localCluster) && !configuredClusters.contains(cluster)) {
                futures.add(this.removeReplicator((String)cluster));
            }
        });
        return FutureUtil.waitForAll(futures);
    }

    @Override
    public void checkMessageExpiry() {
        int messageTtlInSeconds = this.topicPolicies.getMessageTTLInSeconds().get();
        if (messageTtlInSeconds != 0) {
            this.subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
            this.replicators.forEach((__, replicator) -> ((PersistentReplicator)replicator).expireMessages(messageTtlInSeconds));
        }
    }

    @Override
    public void checkMessageDeduplicationInfo() {
        this.messageDeduplication.purgeInactiveProducers();
    }

    public CompletableFuture<Boolean> isCompactionEnabled() {
        Optional<Long> topicCompactionThreshold = this.getTopicPolicies().map(TopicPolicies::getCompactionThreshold);
        if (topicCompactionThreshold.isPresent() && topicCompactionThreshold.get() > 0L) {
            return CompletableFuture.completedFuture(true);
        }
        TopicName topicName = TopicName.get(this.topic);
        return this.brokerService.getPulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenApply(policies -> {
            if (policies.isPresent()) {
                return ((Policies)policies.get()).compaction_threshold != null && ((Policies)policies.get()).compaction_threshold > 0L;
            }
            return this.brokerService.pulsar().getConfiguration().getBrokerServiceCompactionThresholdInBytes() > 0L;
        });
    }

    public void checkCompaction() {
        TopicName name = TopicName.get(this.topic);
        try {
            long compactionThreshold = this.topicPolicies.getCompactionThreshold().get();
            if (this.isSystemTopic() || compactionThreshold != 0L && this.currentCompaction.isDone()) {
                long backlogEstimate = 0L;
                PersistentSubscription compactionSub = this.subscriptions.get("__compaction");
                if (compactionSub != null) {
                    backlogEstimate = compactionSub.estimateBacklogSize();
                } else {
                    long l = backlogEstimate = this.subscriptions.isEmpty() || this.subscriptions.values().stream().noneMatch(sub -> sub.getCursor().isDurable()) ? this.ledger.getTotalSize() : this.ledger.getEstimatedBacklogSize();
                }
                if (backlogEstimate > compactionThreshold) {
                    try {
                        this.triggerCompaction();
                    }
                    catch (BrokerServiceException.AlreadyRunningException are) {
                        log.debug("[{}] Compaction already running, so don't trigger again, even though backlog({}) is over threshold({})", new Object[]{name, backlogEstimate, compactionThreshold});
                    }
                }
            }
        }
        catch (Exception e) {
            log.debug("[{}] Error getting policies", (Object)this.topic);
        }
    }

    public CompletableFuture<Void> preCreateSubscriptionForCompactionIfNeeded() {
        if (this.subscriptions.containsKey("__compaction")) {
            return CompletableFuture.completedFuture(null);
        }
        return this.isCompactionEnabled().thenCompose(enabled -> {
            if (enabled.booleanValue()) {
                return this.createSubscription("__compaction", CommandSubscribe.InitialPosition.Earliest, false).thenCompose(__ -> CompletableFuture.completedFuture(null));
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    CompletableFuture<Void> startReplicator(final String remoteCluster) {
        log.info("[{}] Starting replicator to remote: {}", (Object)this.topic, (Object)remoteCluster);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        String name = PersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        this.ledger.asyncOpenCursor(name, new AsyncCallbacks.OpenCursorCallback(){

            @Override
            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                String localCluster = PersistentTopic.this.brokerService.pulsar().getConfiguration().getClusterName();
                PersistentTopic.this.addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, ex) -> {
                    if (ex == null) {
                        future.complete(null);
                    } else {
                        future.completeExceptionally((Throwable)ex);
                    }
                });
            }

            @Override
            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
            }
        }, null);
        return future;
    }

    protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
        return ((CompletableFuture)AbstractReplicator.validatePartitionedTopicAsync(this.getName(), this.brokerService).thenCompose(__ -> this.brokerService.pulsar().getPulsarResources().getClusterResources().getClusterAsync(remoteCluster).thenApply(clusterData -> this.brokerService.getReplicationClient(remoteCluster, (Optional<ClusterData>)clusterData)))).thenAccept(replicationClient -> {
            Replicator replicator = this.replicators.computeIfAbsent(remoteCluster, r -> {
                try {
                    return new PersistentReplicator(this, cursor, localCluster, remoteCluster, this.brokerService, (PulsarClientImpl)replicationClient);
                }
                catch (PulsarServerException e) {
                    log.error("[{}] Replicator startup failed {}", new Object[]{this.topic, remoteCluster, e});
                    return null;
                }
            });
            if (replicator == null) {
                this.replicators.removeNullValue(remoteCluster);
            }
        });
    }

    CompletableFuture<Void> removeReplicator(final String remoteCluster) {
        log.info("[{}] Removing replicator to {}", (Object)this.topic, (Object)remoteCluster);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        final String name = PersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        ((CompletableFuture)this.replicators.get(remoteCluster).disconnect().thenRun(() -> this.ledger.asyncDeleteCursor(name, new AsyncCallbacks.DeleteCursorCallback(){

            @Override
            public void deleteCursorComplete(Object ctx) {
                PersistentTopic.this.replicators.remove(remoteCluster);
                future.complete(null);
            }

            @Override
            public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}] Failed to delete cursor {} {}", new Object[]{PersistentTopic.this.topic, name, exception.getMessage(), exception});
                future.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
            }
        }, null))).exceptionally(e -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, name, e.getMessage(), e});
            future.completeExceptionally((Throwable)e);
            return null;
        });
        return future;
    }

    public boolean isDeduplicationEnabled() {
        return this.messageDeduplication.isEnabled();
    }

    @Override
    public int getNumberOfConsumers() {
        int count = 0;
        for (PersistentSubscription subscription : this.subscriptions.values()) {
            count += subscription.getConsumers().size();
        }
        return count;
    }

    @Override
    public int getNumberOfSameAddressConsumers(String clientAddress) {
        return this.getNumberOfSameAddressConsumers(clientAddress, this.subscriptions.values());
    }

    public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Override
    public PersistentSubscription getSubscription(String subscriptionName) {
        return this.subscriptions.get(subscriptionName);
    }

    public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
        return this.replicators;
    }

    public Replicator getPersistentReplicator(String remoteCluster) {
        return this.replicators.get(remoteCluster);
    }

    public ManagedLedger getManagedLedger() {
        return this.ledger;
    }

    @Override
    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
        this.publishRateLimitedTimes = 0L;
        TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
        topicStatsHelper.reset();
        this.replicators.forEach((region, replicator) -> replicator.updateRates());
        nsStats.producerCount += this.producers.size();
        bundleStats.producerCount += this.producers.size();
        topicStatsStream.startObject(this.topic);
        topicStatsStream.startList("publishers");
        this.producers.values().forEach(producer -> {
            producer.updateRates();
            PublisherStatsImpl publisherStats = producer.getStats();
            topicStatsHelper.aggMsgRateIn += publisherStats.msgRateIn;
            topicStatsHelper.aggMsgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
            }
            if (hydratePublishers) {
                StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
            }
        });
        topicStatsStream.endList();
        this.lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > this.lastUpdatedAvgPublishRateInMsg ? topicStatsHelper.aggMsgRateIn : (topicStatsHelper.aggMsgRateIn + this.lastUpdatedAvgPublishRateInMsg) / 2.0;
        this.lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > this.lastUpdatedAvgPublishRateInByte ? topicStatsHelper.aggMsgThroughputIn : (topicStatsHelper.aggMsgThroughputIn + this.lastUpdatedAvgPublishRateInByte) / 2.0;
        topicStatsStream.startObject("replication");
        nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
        this.replicators.forEach((cluster, replicator) -> {
            try {
                ((PersistentReplicator)replicator).updateCursorState();
            }
            catch (Exception e) {
                log.warn("[{}] Failed to update cursro state ", (Object)this.topic, (Object)e);
            }
            ReplicatorStatsImpl rStat = replicator.getStats();
            PublisherStatsImpl pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
            rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0.0;
            rStat.msgThroughputIn = pubStats != null ? pubStats.msgThroughputIn : 0.0;
            rStat.inboundConnection = pubStats != null ? pubStats.getAddress() : null;
            rStat.inboundConnectedSince = pubStats != null ? pubStats.getConnectedSince() : null;
            topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
            topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;
            topicStatsStream.startObject((String)cluster);
            topicStatsStream.writePair("connected", rStat.connected);
            topicStatsStream.writePair("msgRateExpired", rStat.msgRateExpired);
            topicStatsStream.writePair("msgRateIn", rStat.msgRateIn);
            topicStatsStream.writePair("msgRateOut", rStat.msgRateOut);
            topicStatsStream.writePair("msgThroughputIn", rStat.msgThroughputIn);
            topicStatsStream.writePair("msgThroughputOut", rStat.msgThroughputOut);
            topicStatsStream.writePair("replicationBacklog", rStat.replicationBacklog);
            topicStatsStream.writePair("replicationDelayInSeconds", rStat.replicationDelayInSeconds);
            topicStatsStream.writePair("inboundConnection", rStat.inboundConnection);
            topicStatsStream.writePair("inboundConnectedSince", rStat.inboundConnectedSince);
            topicStatsStream.writePair("outboundConnection", rStat.outboundConnection);
            topicStatsStream.writePair("outboundConnectedSince", rStat.outboundConnectedSince);
            topicStatsStream.endObject();
            nsStats.msgReplBacklog += (double)rStat.replicationBacklog;
            if (replStats.isMetricsEnabled()) {
                String namespaceClusterKey = replStats.getKeyName(namespace, (String)cluster);
                ReplicationMetrics replicationMetrics = replStats.get(namespaceClusterKey);
                boolean update = false;
                if (replicationMetrics == null) {
                    replicationMetrics = ReplicationMetrics.get();
                    update = true;
                }
                replicationMetrics.connected = replicationMetrics.connected + (rStat.connected ? 1 : 0);
                replicationMetrics.msgRateOut += rStat.msgRateOut;
                replicationMetrics.msgThroughputOut += rStat.msgThroughputOut;
                replicationMetrics.msgReplBacklog += (double)rStat.replicationBacklog;
                if (update) {
                    replStats.put(namespaceClusterKey, replicationMetrics);
                }
                if ((double)rStat.replicationDelayInSeconds > replicationMetrics.maxMsgReplDelayInSeconds) {
                    replicationMetrics.maxMsgReplDelayInSeconds = rStat.replicationDelayInSeconds;
                }
            }
        });
        topicStatsStream.endObject();
        topicStatsStream.startObject("subscriptions");
        nsStats.subsCount = (int)((long)nsStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((subscriptionName, subscription) -> {
            double subMsgRateOut = 0.0;
            double subMsgThroughputOut = 0.0;
            double subMsgRateRedeliver = 0.0;
            try {
                topicStatsStream.startObject((String)subscriptionName);
                topicStatsStream.startList("consumers");
                for (Consumer consumer : subscription.getConsumers()) {
                    ++nsStats.consumerCount;
                    ++bundleStats.consumerCount;
                    consumer.updateRates();
                    ConsumerStatsImpl consumerStats = consumer.getStats();
                    subMsgRateOut += consumerStats.msgRateOut;
                    subMsgThroughputOut += consumerStats.msgThroughputOut;
                    subMsgRateRedeliver += consumerStats.msgRateRedeliver;
                    StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
                }
                topicStatsStream.endList();
                topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(true));
                topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
                topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange", subscription.getTotalNonContiguousDeletedMessagesRange());
                topicStatsStream.writePair("type", subscription.getTypeString());
                if (Subscription.isIndividualAckMode(subscription.getType()) && subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                    PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subscription.getDispatcher();
                    topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs", dispatcher.isBlockedDispatcherOnUnackedMsgs());
                    topicStatsStream.writePair("unackedMessages", dispatcher.getTotalUnackedMessages());
                }
                topicStatsStream.endObject();
                topicStatsHelper.aggMsgRateOut += subMsgRateOut;
                topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
                nsStats.msgBacklog += (double)subscription.getNumberOfEntriesInBacklog(false);
                if (this.brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
                    subscription.checkAndUnblockIfStuck();
                }
            }
            catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{subscriptionName, e.getMessage(), e});
            }
        });
        topicStatsStream.endObject();
        topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0 : topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn;
        topicStatsStream.writePair("producerCount", this.producers.size());
        topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
        topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
        topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
        topicStatsStream.writePair("msgInCount", this.getMsgInCounter());
        topicStatsStream.writePair("bytesInCount", this.getBytesInCounter());
        topicStatsStream.writePair("msgOutCount", this.getMsgOutCounter());
        topicStatsStream.writePair("bytesOutCount", this.getBytesOutCounter());
        topicStatsStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
        topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
        topicStatsStream.writePair("storageSize", this.ledger.getTotalSize());
        topicStatsStream.writePair("backlogSize", this.ledger.getEstimatedBacklogSize());
        topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl)this.ledger).getPendingAddEntriesCount());
        nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        nsStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        nsStats.storageSize += (double)this.ledger.getEstimatedBacklogSize();
        bundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        bundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        bundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        bundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        bundleStats.cacheSize += ((ManagedLedgerImpl)this.ledger).getCacheSize();
        topicStatsStream.endObject();
        this.addEntryLatencyStatsUsec.refresh();
        NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
        this.addEntryLatencyStatsUsec.reset();
    }

    public double getLastUpdatedAvgPublishRateInMsg() {
        return this.lastUpdatedAvgPublishRateInMsg;
    }

    public double getLastUpdatedAvgPublishRateInByte() {
        return this.lastUpdatedAvgPublishRateInByte;
    }

    @Override
    public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
        try {
            return this.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog).get();
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("[{}] Fail to get stats", (Object)this.topic, (Object)e);
            return null;
        }
    }

    public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
        CompletableFuture<TopicStatsImpl> statsFuture = new CompletableFuture<TopicStatsImpl>();
        TopicStatsImpl stats = new TopicStatsImpl();
        ObjectObjectHashMap remotePublishersStats = new ObjectObjectHashMap();
        this.producers.values().forEach(producer -> {
            PublisherStatsImpl publisherStats = producer.getStats();
            stats.msgRateIn += publisherStats.msgRateIn;
            stats.msgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
            } else {
                stats.publishers.add(publisherStats);
            }
        });
        stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : stats.msgThroughputIn / stats.msgRateIn;
        stats.msgInCounter = this.getMsgInCounter();
        stats.bytesInCounter = this.getBytesInCounter();
        stats.msgChunkPublished = this.msgChunkPublished;
        stats.waitingPublishers = this.getWaitingProducersCount();
        stats.bytesOutCounter = this.bytesOutFromRemovedSubscriptions.longValue();
        stats.msgOutCounter = this.msgOutFromRemovedSubscriptions.longValue();
        stats.publishRateLimitedTimes = this.publishRateLimitedTimes;
        this.subscriptions.forEach((name, subscription) -> {
            SubscriptionStatsImpl subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
            stats.msgRateOut += subStats.msgRateOut;
            stats.msgThroughputOut += subStats.msgThroughputOut;
            stats.bytesOutCounter += subStats.bytesOutCounter;
            stats.msgOutCounter += subStats.msgOutCounter;
            stats.subscriptions.put((String)name, subStats);
            stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
            stats.nonContiguousDeletedMessagesRangesSerializedSize += subStats.nonContiguousDeletedMessagesRangesSerializedSize;
        });
        this.replicators.forEach((cluster, replicator) -> {
            ReplicatorStatsImpl replicatorStats = replicator.getStats();
            PublisherStatsImpl pubStats = (PublisherStatsImpl)remotePublishersStats.get(replicator.getRemoteCluster());
            if (pubStats != null) {
                replicatorStats.msgRateIn = pubStats.msgRateIn;
                replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
                replicatorStats.inboundConnection = pubStats.getAddress();
                replicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
            }
            stats.msgRateOut += replicatorStats.msgRateOut;
            stats.msgThroughputOut += replicatorStats.msgThroughputOut;
            stats.replication.put(replicator.getRemoteCluster(), replicatorStats);
        });
        stats.storageSize = this.ledger.getTotalSize();
        stats.backlogSize = this.ledger.getEstimatedBacklogSize();
        stats.deduplicationStatus = this.messageDeduplication.getStatus().toString();
        stats.topicEpoch = this.topicEpoch.orElse(null);
        stats.offloadedStorageSize = this.ledger.getOffloadedSize();
        stats.lastOffloadLedgerId = this.ledger.getLastOffloadedLedgerId();
        stats.lastOffloadSuccessTimeStamp = this.ledger.getLastOffloadedSuccessTimestamp();
        stats.lastOffloadFailureTimeStamp = this.ledger.getLastOffloadedFailureTimestamp();
        Optional<CompactorMXBean> mxBean = this.getCompactorMXBean();
        stats.compaction.reset();
        mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(this.topic)).map(compactionRecord -> {
            stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
            stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
            stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
            stats.compaction.lastCompactionDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
            return compactionRecord;
        });
        if (getEarliestTimeInBacklog && stats.backlogSize != 0L) {
            this.ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) -> {
                if (e != null) {
                    log.error("[{}] Failed to get earliest message publish time in backlog", (Object)this.topic, e);
                    statsFuture.completeExceptionally((Throwable)e);
                } else {
                    stats.earliestMsgPublishTimeInBacklogs = earliestTime;
                    statsFuture.complete(stats);
                }
            });
        } else {
            statsFuture.complete(stats);
        }
        return statsFuture;
    }

    private Optional<CompactorMXBean> getCompactorMXBean() {
        Compactor compactor = null;
        try {
            compactor = this.brokerService.pulsar().getCompactor(false);
        }
        catch (PulsarServerException ex) {
            log.warn("get compactor error", (Throwable)ex);
        }
        return Optional.ofNullable(compactor).map(c -> c.getStats());
    }

    @Override
    public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
        String schemaId;
        CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<PersistentTopicInternalStats>();
        PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)this.ledger;
        stats.entriesAddedCounter = ml.getEntriesAddedCounter();
        stats.numberOfEntries = ml.getNumberOfEntries();
        stats.totalSize = ml.getTotalSize();
        stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
        stats.currentLedgerSize = ml.getCurrentLedgerSize();
        stats.lastLedgerCreatedTimestamp = DateFormatter.format(ml.getLastLedgerCreatedTimestamp());
        if (ml.getLastLedgerCreationFailureTimestamp() != 0L) {
            stats.lastLedgerCreationFailureTimestamp = DateFormatter.format(ml.getLastLedgerCreationFailureTimestamp());
        }
        stats.waitingCursorsCount = ml.getWaitingCursorsCount();
        stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();
        stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
        stats.state = ml.getState().toString();
        stats.ledgers = Lists.newArrayList();
        ArrayList<CompletionStage> futures = Lists.newArrayList();
        CompletableFuture<Set<String>> availableBookiesFuture = this.brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
        availableBookiesFuture.whenComplete((bookies, e) -> {
            if (e != null) {
                log.error("[{}] Failed to fetch available bookies.", (Object)this.topic, e);
                statFuture.completeExceptionally((Throwable)e);
            } else {
                ml.getLedgersInfo().forEach((id, li) -> {
                    ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
                    info.ledgerId = li.getLedgerId();
                    info.entries = li.getEntries();
                    info.size = li.getSize();
                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
                    stats.ledgers.add(info);
                    if (includeLedgerMetadata) {
                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
                            if (ex == null) {
                                info.metadata = lMetadata;
                            }
                            return null;
                        }));
                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
                            if (ex == null) {
                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString).collect(Collectors.toList()));
                            }
                            return null;
                        }));
                    }
                });
            }
        });
        ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
        info.ledgerId = -1L;
        info.entries = -1L;
        info.size = -1L;
        Optional<CompactedTopicContext> compactedTopicContext = this.getCompactedTopicContext();
        if (compactedTopicContext.isPresent()) {
            CompactedTopicContext ledgerContext = compactedTopicContext.get();
            info.ledgerId = ledgerContext.getLedger().getId();
            info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1L;
            info.size = ledgerContext.getLedger().getLength();
        }
        stats.compactedLedger = info;
        stats.cursors = Maps.newTreeMap();
        ml.getCursors().forEach(c -> {
            ManagedCursorImpl cursor = (ManagedCursorImpl)c;
            ManagedLedgerInternalStats.CursorStats cs = new ManagedLedgerInternalStats.CursorStats();
            cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
            cs.readPosition = cursor.getReadPosition().toString();
            cs.waitingReadOp = cursor.hasPendingReadRequest();
            cs.pendingReadOps = cursor.getPendingReadOpsCount();
            cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
            cs.cursorLedger = cursor.getCursorLedger();
            cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
            cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
            cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
            cs.state = cursor.getState();
            cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
            cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
            cs.properties = cursor.getProperties();
            PersistentSubscription sub = this.subscriptions.get(Codec.decode(c.getName()));
            if (sub != null) {
                if (sub.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                    PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)sub.getDispatcher();
                    cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
                    cs.subscriptionHavePendingReplayRead = dispatcher.havePendingReplayRead;
                } else if (sub.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
                    PersistentDispatcherSingleActiveConsumer dispatcher = (PersistentDispatcherSingleActiveConsumer)sub.getDispatcher();
                    cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
                }
            }
            stats.cursors.put(cursor.getName(), cs);
        });
        try {
            schemaId = TopicName.get(this.topic).getSchemaName();
        }
        catch (Throwable t) {
            statFuture.completeExceptionally(t);
            return statFuture;
        }
        CompletableFuture schemaStoreLedgersFuture = new CompletableFuture();
        stats.schemaLedgers = Collections.synchronizedList(new ArrayList());
        if (this.brokerService.getPulsar().getSchemaStorage() != null && this.brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
            ((CompletableFuture)((BookkeeperSchemaStorage)this.brokerService.getPulsar().getSchemaStorage()).getStoreLedgerIdsBySchemaId(schemaId).thenAccept(ledgers -> {
                ArrayList getLedgerMetadataFutures = new ArrayList();
                ledgers.forEach(ledgerId -> {
                    CompletableFuture<LedgerMetadata> metadataFuture;
                    CompletableFuture<Object> completableFuture;
                    block4: {
                        completableFuture = new CompletableFuture<Object>();
                        getLedgerMetadataFutures.add(completableFuture);
                        metadataFuture = null;
                        try {
                            metadataFuture = this.brokerService.getPulsar().getBookKeeperClient().getLedgerMetadata((long)ledgerId);
                        }
                        catch (NullPointerException e2) {
                            if (!log.isDebugEnabled()) break block4;
                            log.debug("{{}} Failed to get ledger metadata for the schema ledger {}", new Object[]{this.topic, ledgerId, e2});
                        }
                    }
                    if (metadataFuture != null) {
                        ((CompletableFuture)metadataFuture.thenAccept(metadata -> {
                            ManagedLedgerInternalStats.LedgerInfo schemaLedgerInfo = new ManagedLedgerInternalStats.LedgerInfo();
                            schemaLedgerInfo.ledgerId = metadata.getLedgerId();
                            schemaLedgerInfo.entries = metadata.getLastEntryId() + 1L;
                            schemaLedgerInfo.size = metadata.getLength();
                            if (includeLedgerMetadata) {
                                info.metadata = metadata.toSafeString();
                            }
                            stats.schemaLedgers.add(schemaLedgerInfo);
                            completableFuture.complete(null);
                        })).exceptionally(e -> {
                            completableFuture.completeExceptionally((Throwable)e);
                            return null;
                        });
                    } else {
                        completableFuture.complete(null);
                    }
                });
                ((CompletableFuture)FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> schemaStoreLedgersFuture.complete(null))).exceptionally(e -> {
                    schemaStoreLedgersFuture.completeExceptionally((Throwable)e);
                    return null;
                });
            })).exceptionally(e -> {
                schemaStoreLedgersFuture.completeExceptionally((Throwable)e);
                return null;
            });
        } else {
            schemaStoreLedgersFuture.complete(null);
        }
        ((CompletableFuture)schemaStoreLedgersFuture.thenRun(() -> {
            if (futures != null) {
                FutureUtil.waitForAll(futures).handle((res, ex) -> {
                    statFuture.complete(stats);
                    return null;
                });
            } else {
                statFuture.complete(stats);
            }
        })).exceptionally(e -> {
            statFuture.completeExceptionally((Throwable)e);
            return null;
        });
        return statFuture;
    }

    public Optional<CompactedTopicContext> getCompactedTopicContext() {
        try {
            return ((CompactedTopicImpl)this.compactedTopic).getCompactedTopicContext();
        }
        catch (InterruptedException | ExecutionException e) {
            log.warn("[{}]Fail to get ledger information for compacted topic.", (Object)this.topic);
            return Optional.empty();
        }
    }

    public long getBacklogSize() {
        return this.ledger.getEstimatedBacklogSize();
    }

    public boolean isActive(InactiveTopicDeleteMode deleteMode) {
        switch (deleteMode) {
            case delete_when_no_subscriptions: {
                if (this.subscriptions.isEmpty()) break;
                return true;
            }
            case delete_when_subscriptions_caught_up: {
                if (!this.hasBacklogs()) break;
                return true;
            }
        }
        if (TopicName.get(this.topic).isGlobal()) {
            return this.hasLocalProducers();
        }
        return this.currentUsageCount() != 0L;
    }

    private boolean hasBacklogs() {
        return this.subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0L);
    }

    @Override
    public void checkGC() {
        if (!this.isDeleteWhileInactive()) {
            return;
        }
        InactiveTopicDeleteMode deleteMode = this.topicPolicies.getInactiveTopicPolicies().get().getInactiveTopicDeleteMode();
        int maxInactiveDurationInSec = this.topicPolicies.getInactiveTopicPolicies().get().getMaxInactiveDurationSeconds();
        if (this.isActive(deleteMode)) {
            this.lastActive = System.nanoTime();
        } else {
            if (System.nanoTime() - this.lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
                return;
            }
            if (this.shouldTopicBeRetained()) {
                return;
            }
            CompletableFuture replCloseFuture = new CompletableFuture();
            if (TopicName.get(this.topic).isGlobal()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", (Object)this.topic, (Object)maxInactiveDurationInSec);
                }
                ((CompletableFuture)this.closeReplProducersIfNoBacklog().thenRun(() -> {
                    if (this.hasRemoteProducers()) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC", (Object)this.topic);
                        }
                        replCloseFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has connected remote producers"));
                    } else {
                        log.info("[{}] Global topic inactive for {} seconds, closed repl producers", (Object)this.topic, (Object)maxInactiveDurationInSec);
                        replCloseFuture.complete(null);
                    }
                })).exceptionally(e -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", (Object)this.topic);
                    }
                    replCloseFuture.completeExceptionally(e.getCause());
                    return null;
                });
            } else {
                replCloseFuture.complete(null);
            }
            ((CompletableFuture)((CompletableFuture)((CompletableFuture)replCloseFuture.thenCompose(v -> this.delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true))).thenApply(res -> this.tryToDeletePartitionedMetadata())).thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", (Object)this.topic))).exceptionally(e -> {
                if (e.getCause() instanceof BrokerServiceException.TopicBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Did not delete busy topic: {}", (Object)this.topic, (Object)e.getCause().getMessage());
                    }
                } else {
                    log.warn("[{}] Inactive topic deletion failed", (Object)this.topic, e);
                }
                return null;
            });
        }
    }

    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
        if (TopicName.get(this.topic).isPartitioned() && !this.deletePartitionedTopicMetadataWhileInactive()) {
            return CompletableFuture.completedFuture(null);
        }
        TopicName topicName = TopicName.get(TopicName.get(this.topic).getPartitionedTopicName());
        try {
            NamespaceResources.PartitionedTopicResources partitionedTopicResources = this.getBrokerService().pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
            if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<Void>();
            ((CompletableFuture)((CompletableFuture)this.getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())).thenAccept(metadata -> {
                for (int i = 0; i < metadata.partitions; ++i) {
                    if (!this.brokerService.getPulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicName.getPartition(i)).join().booleanValue()) continue;
                    throw new UnsupportedOperationException();
                }
            })).thenAccept(res -> ((CompletableFuture)partitionedTopicResources.deletePartitionedTopicAsync(topicName).thenAccept(r -> deleteMetadataFuture.complete(null))).exceptionally(ex -> {
                deleteMetadataFuture.completeExceptionally(ex.getCause());
                return null;
            }))).exceptionally(e -> {
                if (!(e.getCause() instanceof UnsupportedOperationException)) {
                    log.error("delete metadata fail", e);
                }
                deleteMetadataFuture.complete(null);
                return null;
            });
            return deleteMetadataFuture;
        }
        catch (Exception e2) {
            return FutureUtil.failedFuture(e2);
        }
    }

    @Override
    public void checkInactiveSubscriptions() {
        block3: {
            TopicName name = TopicName.get(this.topic);
            try {
                Policies policies = this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(name.getNamespaceObject()).orElseThrow(() -> new MetadataStoreException.NotFoundException());
                int defaultExpirationTime = this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes();
                Integer nsExpirationTime = policies.subscription_expiration_time_minutes;
                long expirationTimeMillis = TimeUnit.MINUTES.toMillis(nsExpirationTime == null ? (long)defaultExpirationTime : (long)nsExpirationTime.intValue());
                if (expirationTimeMillis > 0L) {
                    this.subscriptions.forEach((subName, sub) -> {
                        if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
                            return;
                        }
                        if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
                            sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration", (Object)this.topic, subName));
                        }
                    });
                }
            }
            catch (Exception e) {
                if (!log.isDebugEnabled()) break block3;
                log.debug("[{}] Error getting policies", (Object)this.topic);
            }
        }
    }

    @Override
    public void checkBackloggedCursors() {
        this.subscriptions.forEach((subName, subscription) -> {
            if (!subscription.getConsumers().isEmpty() && subscription.getCursor().getNumberOfEntries() < this.backloggedCursorThresholdEntries) {
                subscription.getCursor().setActive();
            } else {
                subscription.getCursor().setInactive();
            }
        });
    }

    @Override
    public void checkDeduplicationSnapshot() {
        this.messageDeduplication.takeSnapshot();
    }

    private boolean shouldTopicBeRetained() {
        RetentionPolicies retentionPolicies = this.topicPolicies.getRetentionPolicies().get();
        long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
        return retentionTime < 0L || System.nanoTime() - this.lastActive < retentionTime;
    }

    @Override
    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", new Object[]{this.topic, this.isEncryptionRequired, data.encryption_required});
        }
        if (data.deleted) {
            log.debug("Ignore the update because it has been deleted : {}", (Object)data);
            return CompletableFuture.completedFuture(null);
        }
        this.updateTopicPolicyByNamespacePolicy(data);
        this.isEncryptionRequired = data.encryption_required;
        this.isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
        this.schemaValidationEnforced = data.schema_validation_enforced;
        Optional<TopicPolicies> topicPolicies = this.getTopicPolicies();
        this.initializeRateLimiterIfNeeded(Optional.ofNullable(data));
        this.updateMaxPublishRate(data);
        ArrayList producerCheckFutures = new ArrayList(this.producers.size());
        this.producers.values().forEach(producer -> producerCheckFutures.add(producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
        return FutureUtil.waitForAll(producerCheckFutures).thenCompose(__ -> {
            ArrayList subscriptionCheckFutures = new ArrayList((int)this.subscriptions.size());
            this.subscriptions.forEach((subName, sub) -> {
                ArrayList consumerCheckFutures = new ArrayList(sub.getConsumers().size());
                sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
                subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
                    Dispatcher dispatcher = sub.getDispatcher();
                    if (!(dispatcher == null || topicPolicies.isPresent() && ((TopicPolicies)topicPolicies.get()).isSubscriptionDispatchRateSet())) {
                        dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
                    }
                }));
            });
            return FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose(___ -> {
                this.replicators.forEach((name, replicator) -> replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
                this.checkMessageExpiry();
                CompletableFuture<Void> replicationFuture = this.checkReplicationAndRetryOnFailure();
                CompletableFuture<Void> dedupFuture = this.checkDeduplicationStatus();
                CompletableFuture<Void> persistentPoliciesFuture = this.checkPersistencePolicies();
                if (!(!this.dispatchRateLimiter.isPresent() || topicPolicies.isPresent() && ((TopicPolicies)topicPolicies.get()).isDispatchRateSet())) {
                    this.dispatchRateLimiter.get().onPoliciesUpdate(data);
                }
                if (this.subscribeRateLimiter.isPresent()) {
                    this.subscribeRateLimiter.get().onPoliciesUpdate(data);
                }
                return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture, this.preCreateSubscriptionForCompactionIfNeeded());
            });
        });
    }

    @Override
    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
        return this.topicPolicies.getBackLogQuotaMap().get((Object)backlogQuotaType).get();
    }

    @Override
    public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        BacklogQuota backlogQuota = this.getBacklogQuota(backlogQuotaType);
        if (backlogQuota != null) {
            BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();
            if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && this.isSizeBacklogExceeded()) {
                    log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", (Object)this.getName(), (Object)producerName);
                    return FutureUtil.failedFuture(new BrokerServiceException.TopicBacklogQuotaExceededException(retentionPolicy));
                }
                if (backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age) {
                    return this.checkTimeBacklogExceeded().thenCompose(isExceeded -> {
                        if (isExceeded.booleanValue()) {
                            log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", (Object)this.getName(), (Object)producerName);
                            return FutureUtil.failedFuture(new BrokerServiceException.TopicBacklogQuotaExceededException(retentionPolicy));
                        }
                        return CompletableFuture.completedFuture(null);
                    });
                }
            } else {
                return CompletableFuture.completedFuture(null);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean isSizeBacklogExceeded() {
        long backlogQuotaLimitInBytes = this.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
        if (backlogQuotaLimitInBytes < 0L) {
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] - backlog quota limit = [{}]", (Object)this.getName(), (Object)backlogQuotaLimitInBytes);
        }
        long storageSize = this.getBacklogSize();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Storage size = [{}], limit [{}]", new Object[]{this.getName(), storageSize, backlogQuotaLimitInBytes});
        }
        return storageSize >= backlogQuotaLimitInBytes;
    }

    public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
        final TopicName topicName = TopicName.get(this.getName());
        final int backlogQuotaLimitInSecond = this.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
        if (backlogQuotaLimitInSecond <= 0 || ((ManagedCursorContainer)this.ledger.getCursors()).getSlowestReaderPosition() == null) {
            return CompletableFuture.completedFuture(false);
        }
        if (this.brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
            final CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
            PositionImpl position = ((ManagedLedgerImpl)this.ledger).getNextValidPosition(((ManagedCursorContainer)this.ledger.getCursors()).getSlowestReaderPosition());
            ((ManagedLedgerImpl)this.ledger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void readEntryComplete(Entry entry, Object ctx) {
                    try {
                        long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                        boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
                        if (expired && log.isDebugEnabled()) {
                            log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlogexceeded quota {}", (Object)((ManagedLedgerImpl)PersistentTopic.this.ledger).getSlowestConsumer().getName(), (Object)backlogQuotaLimitInSecond);
                        }
                        future.complete(expired);
                    }
                    catch (Exception e) {
                        log.error("[{}][{}] Error deserializing message for backlog check", (Object)topicName, (Object)e);
                        future.complete(false);
                    }
                    finally {
                        entry.release();
                    }
                }

                @Override
                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("[{}][{}] Error reading entry for precise time based  backlog check", (Object)topicName, (Object)exception);
                    future.complete(false);
                }
            }, null);
            return future;
        }
        Long ledgerId = ((ManagedCursorContainer)this.ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
        try {
            MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = this.ledger.getLedgerInfo(ledgerId).get();
            if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0L && ((ManagedLedgerImpl)this.ledger).getClock().millis() - ledgerInfo.getTimestamp() > (long)(backlogQuotaLimitInSecond * 1000)) {
                if (log.isDebugEnabled()) {
                    log.debug("Time based backlog quota exceeded, quota {}, age of ledger slowest cursor currently on {}", (Object)(backlogQuotaLimitInSecond * 1000), (Object)(((ManagedLedgerImpl)this.ledger).getClock().millis() - ledgerInfo.getTimestamp()));
                }
                return CompletableFuture.completedFuture(true);
            }
            return CompletableFuture.completedFuture(false);
        }
        catch (Exception e) {
            log.error("[{}][{}] Error reading entry for precise time based backlog check", (Object)topicName, (Object)e);
            return CompletableFuture.completedFuture(false);
        }
    }

    @Override
    public boolean isReplicated() {
        return !this.replicators.isEmpty();
    }

    public CompletableFuture<MessageId> terminate() {
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.ledger.asyncTerminate(new AsyncCallbacks.TerminateCallback(){

            @Override
            public void terminateComplete(Position lastCommittedPosition, Object ctx) {
                PersistentTopic.this.producers.values().forEach(Producer::disconnect);
                PersistentTopic.this.subscriptions.forEach((name, sub) -> sub.topicTerminated());
                PositionImpl lastPosition = (PositionImpl)lastCommittedPosition;
                MessageIdImpl messageId = new MessageIdImpl(lastPosition.getLedgerId(), lastPosition.getEntryId(), -1);
                log.info("[{}] Topic terminated at {}", (Object)PersistentTopic.this.getName(), (Object)messageId);
                future.complete(messageId);
            }

            @Override
            public void terminateFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
        Entry entry = null;
        boolean isOldestMessageExpired = false;
        try {
            entry = cursor.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Include);
            if (entry != null) {
                long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                isOldestMessageExpired = MessageImpl.isEntryExpired((int)((double)messageTTLInSeconds * 1.5), entryTimestamp);
            }
        }
        catch (Exception e) {
            log.warn("[{}] Error while getting the oldest message", (Object)this.topic, (Object)e);
        }
        finally {
            if (entry != null) {
                entry.release();
            }
        }
        return isOldestMessageExpired;
    }

    public CompletableFuture<Void> clearBacklog() {
        log.info("[{}] Clearing backlog on all cursors in the topic.", (Object)this.topic);
        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
        List<String> cursors = this.getSubscriptions().keys();
        cursors.addAll(this.getReplicators().keys());
        for (String cursor : cursors) {
            futures.add(this.clearBacklog(cursor));
        }
        return FutureUtil.waitForAll(futures);
    }

    public CompletableFuture<Void> clearBacklog(String cursorName) {
        log.info("[{}] Clearing backlog for cursor {} in the topic.", (Object)this.topic, (Object)cursorName);
        PersistentSubscription sub = this.getSubscription(cursorName);
        if (sub != null) {
            return sub.clearBacklog();
        }
        PersistentReplicator repl = (PersistentReplicator)this.getPersistentReplicator(cursorName);
        if (repl != null) {
            return repl.clearBacklog();
        }
        return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
    }

    @Override
    public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
        return Optional.ofNullable(this.brokerService.getBrokerDispatchRateLimiter());
    }

    public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
        return this.subscribeRateLimiter;
    }

    public long getLastPublishedSequenceId(String producerName) {
        return this.messageDeduplication.getLastPublishedSequenceId(producerName);
    }

    @Override
    public Position getLastPosition() {
        return this.ledger.getLastConfirmedEntry();
    }

    @Override
    public CompletableFuture<MessageId> getLastMessageId() {
        final CompletableFuture<MessageId> completableFuture = new CompletableFuture<MessageId>();
        final PositionImpl position = (PositionImpl)this.ledger.getLastConfirmedEntry();
        String name = this.getName();
        final int partitionIndex = TopicName.getPartitionIndex(name);
        if (log.isDebugEnabled()) {
            log.debug("getLastMessageId {}, partitionIndex{}, position {}", new Object[]{name, partitionIndex, position});
        }
        if (position.getEntryId() == -1L) {
            completableFuture.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
            return completableFuture;
        }
        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl)this.ledger;
        if (!ledgerImpl.ledgerExists(position.getLedgerId())) {
            completableFuture.complete(MessageId.earliest);
            return completableFuture;
        }
        ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void readEntryComplete(Entry entry, Object ctx) {
                try {
                    MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                    if (metadata.hasNumMessagesInBatch()) {
                        completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex, metadata.getNumMessagesInBatch() - 1));
                    } else {
                        completableFuture.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
                    }
                }
                finally {
                    entry.release();
                }
            }

            @Override
            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                completableFuture.completeExceptionally(exception);
            }
        }, null);
        return completableFuture;
    }

    public synchronized void triggerCompaction() throws PulsarServerException, BrokerServiceException.AlreadyRunningException {
        if (!this.currentCompaction.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Compaction already in progress");
        }
        this.currentCompaction = this.brokerService.pulsar().getCompactor().compact(this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized LongRunningProcessStatus compactionStatus() {
        CompletableFuture<Long> current;
        PersistentTopic persistentTopic = this;
        synchronized (persistentTopic) {
            current = this.currentCompaction;
        }
        if (!current.isDone()) {
            return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            if (current.join() == -4273917950L) {
                return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
            }
            return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
        }
        catch (CancellationException | CompletionException e) {
            return LongRunningProcessStatus.forError(e.getMessage());
        }
    }

    public synchronized void triggerOffload(final MessageIdImpl messageId) throws BrokerServiceException.AlreadyRunningException {
        if (!this.currentOffload.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Offload already in progress");
        }
        this.currentOffload = new CompletableFuture();
        final CompletableFuture promise = this.currentOffload;
        log.info("[{}] Starting offload operation at messageId {}", (Object)this.topic, (Object)messageId);
        this.getManagedLedger().asyncOffloadPrefix(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()), new AsyncCallbacks.OffloadCallback(){

            @Override
            public void offloadComplete(Position pos, Object ctx) {
                PositionImpl impl = (PositionImpl)pos;
                log.info("[{}] Completed successfully offload operation at messageId {}", (Object)PersistentTopic.this.topic, (Object)messageId);
                promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
            }

            @Override
            public void offloadFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed offload operation at messageId {}", new Object[]{PersistentTopic.this.topic, messageId, exception});
                promise.completeExceptionally(exception);
            }
        }, null);
    }

    public synchronized OffloadProcessStatus offloadStatus() {
        if (!this.currentOffload.isDone()) {
            return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            if (this.currentOffload.join() == MessageId.earliest) {
                return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
            }
            return OffloadProcessStatus.forSuccess(this.currentOffload.join());
        }
        catch (CancellationException | CompletionException e) {
            log.warn("Failed to offload", e.getCause());
            return OffloadProcessStatus.forError(e.getMessage());
        }
    }

    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return this.hasSchema().thenCompose(hasSchema -> {
            int numActiveConsumers = this.subscriptions.values().stream().mapToInt(subscription -> subscription.getConsumers().size()).sum();
            if (hasSchema.booleanValue() || !this.producers.isEmpty() || numActiveConsumers != 0 || this.ledger.getTotalSize() != 0L) {
                return this.checkSchemaCompatibleForConsumer(schema);
            }
            return this.addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
        });
    }

    public synchronized void checkReplicatedSubscriptionControllerState() {
        AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
        this.subscriptions.forEach((name, subscription) -> {
            if (subscription.isReplicated()) {
                shouldBeEnabled.set(true);
            }
        });
        if (!shouldBeEnabled.get()) {
            log.info("[{}] There are no replicated subscriptions on the topic", (Object)this.topic);
        }
        this.checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
    }

    private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
        boolean isCurrentlyEnabled = this.replicatedSubscriptionsController.isPresent();
        boolean isEnableReplicatedSubscriptions = this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
            log.info("[{}] Enabling replicated subscriptions controller", (Object)this.topic);
            this.replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this, this.brokerService.pulsar().getConfiguration().getClusterName()));
        } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
            log.info("[{}] Disabled replicated subscriptions controller", (Object)this.topic);
            this.replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
            this.replicatedSubscriptionsController = Optional.empty();
        }
    }

    void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
        ReplicatedSubscriptionsController ctrl = this.replicatedSubscriptionsController.orElse(null);
        if (ctrl == null) {
            this.checkReplicatedSubscriptionControllerState(true);
            ctrl = this.replicatedSubscriptionsController.get();
        }
        ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);
    }

    public Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
        return this.replicatedSubscriptionsController;
    }

    public CompactedTopic getCompactedTopic() {
        return this.compactedTopic;
    }

    @Override
    public boolean isSystemTopic() {
        return false;
    }

    @Override
    public boolean isPersistent() {
        return true;
    }

    private synchronized void fence() {
        int timeout;
        this.isFenced = true;
        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
        if ((monitoringTask == null || monitoringTask.isDone()) && (timeout = this.brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds()) > 0) {
            this.fencedTopicMonitoringTask = this.brokerService.executor().schedule(this::closeFencedTopicForcefully, (long)timeout, TimeUnit.SECONDS);
        }
    }

    private synchronized void unfence() {
        this.isFenced = false;
        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
        if (monitoringTask != null && !monitoringTask.isDone()) {
            monitoringTask.cancel(false);
        }
    }

    private void closeFencedTopicForcefully() {
        if (this.isFenced) {
            int timeout = this.brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
            if (this.isClosingOrDeleting) {
                log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", new Object[]{this.topic, timeout, this.pendingWriteOps.get()});
            } else {
                log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", new Object[]{this.topic, timeout, this.pendingWriteOps.get()});
                this.close();
            }
        }
    }

    private void fenceTopicToCloseOrDelete() {
        this.isClosingOrDeleting = true;
        this.isFenced = true;
    }

    private void unfenceTopicToResume() {
        this.isFenced = false;
        this.isClosingOrDeleting = false;
    }

    @Override
    public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
        this.pendingWriteOps.incrementAndGet();
        if (this.isFenced) {
            publishContext.completed(new BrokerServiceException.TopicFencedException("fenced"), -1L, -1L);
            this.decrementPendingWriteOpsAndCheck();
            return;
        }
        if (this.isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
            publishContext.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            this.decrementPendingWriteOpsAndCheck();
            return;
        }
        MessageDeduplication.MessageDupStatus status = this.messageDeduplication.isDuplicate(publishContext, headersAndPayload);
        switch (status) {
            case NotDup: {
                ((CompletableFuture)this.transactionBuffer.appendBufferToTxn(txnID, publishContext.getSequenceId(), headersAndPayload).thenAccept(position -> {
                    this.messageDeduplication.recordMessagePersisted(publishContext, (PositionImpl)position);
                    publishContext.completed(null, ((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId());
                    this.decrementPendingWriteOpsAndCheck();
                })).exceptionally(throwable -> {
                    if (!((throwable = throwable.getCause()) instanceof ManagedLedgerException)) {
                        throwable = new ManagedLedgerException((Throwable)throwable);
                    }
                    this.addFailed((ManagedLedgerException)throwable, publishContext);
                    return null;
                });
                break;
            }
            case Dup: {
                publishContext.completed(null, -1L, -1L);
                this.decrementPendingWriteOpsAndCheck();
                break;
            }
            default: {
                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1L, -1L);
                this.decrementPendingWriteOpsAndCheck();
            }
        }
    }

    @Override
    public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
        if (0 == txnAction) {
            return this.transactionBuffer.commitTxn(txnID, lowWaterMark);
        }
        if (1 == txnAction) {
            return this.transactionBuffer.abortTxn(txnID, lowWaterMark);
        }
        return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported txnAction " + txnAction));
    }

    @Override
    public CompletableFuture<Void> truncate() {
        return this.ledger.asyncTruncate();
    }

    public long getDelayedDeliveryTickTimeMillis() {
        return this.topicPolicies.getDelayedDeliveryTickTimeMillis().get();
    }

    public boolean isDelayedDeliveryEnabled() {
        return this.topicPolicies.getDelayedDeliveryEnabled().get();
    }

    public int getMaxUnackedMessagesOnSubscription() {
        return this.topicPolicies.getMaxUnackedMessagesOnSubscription().get();
    }

    @Override
    public void onUpdate(TopicPolicies policies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] update topic policy: {}", (Object)this.topic, (Object)policies);
        }
        if (policies == null) {
            return;
        }
        this.updateTopicPolicy(policies);
        Optional<Policies> namespacePolicies = this.getNamespacePolicies();
        this.initializeTopicDispatchRateLimiterIfNeeded(policies);
        this.dispatchRateLimiter.ifPresent(limiter -> {
            if (policies.isDispatchRateSet()) {
                this.dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
            } else {
                this.dispatchRateLimiter.get().updateDispatchRate();
            }
        });
        ArrayList consumerCheckFutures = new ArrayList();
        this.subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
            Dispatcher dispatcher = sub.getDispatcher();
            if (dispatcher != null) {
                dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
            }
        }))));
        ((CompletableFuture)FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
            if (policies.getPublishRate() != null) {
                this.updatePublishDispatcher(policies.getPublishRate());
            } else {
                this.updateMaxPublishRate(namespacePolicies.orElse(null));
            }
            this.initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
            if (this.subscribeRateLimiter.isPresent()) {
                this.subscribeRateLimiter.ifPresent(subscribeRateLimiter -> subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
            }
            this.replicators.forEach((name, replicator) -> replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
            this.checkDeduplicationStatus();
            this.preCreateSubscriptionForCompactionIfNeeded();
            this.checkPersistencePolicies();
        })).exceptionally(e -> {
            Throwable t = e instanceof CompletionException ? e.getCause() : e;
            log.error("[{}] update topic policy error: {}", new Object[]{this.topic, t.getMessage(), t});
            return null;
        });
    }

    private Optional<Policies> getNamespacePolicies() {
        return DispatchRateLimiter.getPolicies(this.brokerService, this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
        Optional<DispatchRateLimiter> optional = this.dispatchRateLimiter;
        synchronized (optional) {
            if (!this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, DispatchRateLimiter.Type.TOPIC));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
        if (!policies.isPresent()) {
            return;
        }
        Optional<SubscribeRateLimiter> optional = this.subscribeRateLimiter;
        synchronized (optional) {
            if (!this.subscribeRateLimiter.isPresent() && policies.get().getSubscribeRate() != null && policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
            } else if (!policies.get().isSubscribeRateSet() || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
                this.subscribeRateLimiter = Optional.empty();
            }
        }
    }

    protected CompletableFuture<Void> initTopicPolicy() {
        if (this.brokerService.pulsar().getConfig().isSystemTopicEnabled() && this.brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
            return CompletableFuture.completedFuture(null).thenRunAsync(() -> this.onUpdate(this.brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(this.topic))), this.brokerService.getTopicOrderedExecutor());
        }
        return CompletableFuture.completedFuture(null);
    }

    @VisibleForTesting
    public MessageDeduplication getMessageDeduplication() {
        return this.messageDeduplication;
    }

    private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
        if (StringUtils.isNotEmpty(subscriptionName) && this.getSubscription(subscriptionName) != null) {
            return false;
        }
        Integer maxSubsPerTopic = this.topicPolicies.getMaxSubscriptionsPerTopic().get();
        if (maxSubsPerTopic != null && maxSubsPerTopic > 0) {
            return this.subscriptions != null && this.subscriptions.size() >= (long)maxSubsPerTopic.intValue();
        }
        return false;
    }

    public boolean checkSubscriptionTypesEnable(CommandSubscribe.SubType subType) {
        EnumSet<CommandSubscribe.SubType> subTypesEnabled = this.topicPolicies.getSubscriptionTypesEnabled().get();
        return subTypesEnabled != null && subTypesEnabled.contains((Object)subType);
    }

    public TransactionBufferStats getTransactionBufferStats() {
        return this.transactionBuffer.getStats();
    }

    public TransactionPendingAckStats getTransactionPendingAckStats(String subName) {
        return this.subscriptions.get(subName).getTransactionPendingAckStats();
    }

    public PositionImpl getMaxReadPosition() {
        return this.transactionBuffer.getMaxReadPosition();
    }

    public boolean isTxnAborted(TxnID txnID) {
        return this.transactionBuffer.isTxnAborted(txnID);
    }

    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        return this.transactionBuffer.getTransactionInBufferStats(txnID);
    }

    @Override
    protected boolean isTerminated() {
        return this.ledger.isTerminated();
    }

    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
        return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
    }

    public CompletableFuture<ManagedLedger> getPendingAckManagedLedger(String subName) {
        PersistentSubscription subscription = this.subscriptions.get(subName);
        if (subscription == null) {
            return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionNotFoundException(this.topic + " not found subscription : " + subName));
        }
        return subscription.getPendingAckManageLedger();
    }

    public long getLastDataMessagePublishedTimestamp() {
        return this.lastDataMessagePublishedTimestamp;
    }

    public TransactionBuffer getTransactionBuffer() {
        return this.transactionBuffer;
    }

    private static class TopicStatsHelper {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgThrottlingFailure;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap();

        public TopicStatsHelper() {
            this.reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0;
            this.aggMsgRateIn = 0.0;
            this.aggMsgThroughputIn = 0.0;
            this.aggMsgRateOut = 0.0;
            this.aggMsgThrottlingFailure = 0.0;
            this.aggMsgThroughputOut = 0.0;
            this.remotePublishersStats.clear();
        }
    }
}

