package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/BacklogQuotaManager.class */
public class BacklogQuotaManager {
    private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
    private final BacklogQuota defaultQuota;
    private final ZooKeeperDataCache<Policies> zkCache;

    public BacklogQuotaManager(PulsarService pulsarService) {
        this.defaultQuota = new BacklogQuota(pulsarService.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024, pulsarService.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
        this.zkCache = pulsarService.getConfigurationCache().policiesCache();
    }

    public BacklogQuota getDefaultQuota() {
        return this.defaultQuota;
    }

    public BacklogQuota getBacklogQuota(String str, String str2) {
        try {
            return (BacklogQuota) this.zkCache.get(str2).map(policies -> {
                return policies.backlog_quota_map.getOrDefault(BacklogQuota.BacklogQuotaType.destination_storage, this.defaultQuota);
            }).orElse(this.defaultQuota);
        } catch (Exception e) {
            log.error(String.format("Failed to read policies data, will apply the default backlog quota: namespace=%s", str), e);
            return this.defaultQuota;
        }
    }

    public long getBacklogQuotaLimit(String str) {
        return getBacklogQuota(str, AdminResource.path("policies", str)).getLimit();
    }

    public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
        String namespace = TopicName.get(persistentTopic.getName()).getNamespace();
        BacklogQuota backlogQuota = getBacklogQuota(namespace, AdminResource.path("policies", namespace));
        log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", persistentTopic.getName(), backlogQuota.getPolicy());
        switch (backlogQuota.getPolicy()) {
            case consumer_backlog_eviction:
                dropBacklog(persistentTopic, backlogQuota);
                return;
            case producer_exception:
            case producer_request_hold:
                disconnectProducers(persistentTopic);
                return;
            default:
                return;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:22:0x0172 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:26:0x005a A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void dropBacklog(org.apache.pulsar.broker.service.persistent.PersistentTopic r11, org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota r12) {
        /*
            Method dump skipped, instructions count: 412
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.service.BacklogQuotaManager.dropBacklog(org.apache.pulsar.broker.service.persistent.PersistentTopic, org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota):void");
    }

    private void disconnectProducers(PersistentTopic persistentTopic) {
        ArrayList newArrayList = Lists.newArrayList();
        persistentTopic.getProducers().forEach(producer -> {
            log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer", producer.getProducerName(), persistentTopic.getName());
            newArrayList.add(producer.disconnect());
        });
        FutureUtil.waitForAll(newArrayList).thenRun(() -> {
            log.info("All producers on topic [{}] are disconnected", persistentTopic.getName());
        }).exceptionally(th -> {
            log.error("Error in disconnecting producers on topic [{}] [{}]", persistentTopic.getName(), th);
            return null;
        });
    }
}
