package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.class */
public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
    private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;
    private static final String NAMESPACE1 = "system-topic/namespace-1";
    private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
    private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
    private static final String NAMESPACE2 = "system-topic/namespace-2";
    private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
    private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2");
    private static final String NAMESPACE3 = "system-topic/namespace-3";
    private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1");
    private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2");

    /* renamed from: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesServiceTest$1TopicPolicyListenerImpl, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest$1TopicPolicyListenerImpl.class */
    class C1TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> {
        C1TopicPolicyListenerImpl() {
        }

        public void onUpdate(TopicPolicies topicPolicies) {
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        prepareData();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException {
        TopicName topicName = TopicName.get("test");
        CompletableFuture<Void> thenRunAsync = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
            for (int i = 0; i < 100; i++) {
                C1TopicPolicyListenerImpl c1TopicPolicyListenerImpl = new C1TopicPolicyListenerImpl();
                this.systemTopicBasedTopicPoliciesService.registerListener(topicName, c1TopicPolicyListenerImpl);
                Assert.assertNotNull(this.systemTopicBasedTopicPoliciesService.listeners.get(topicName));
                Assert.assertTrue(((List) this.systemTopicBasedTopicPoliciesService.listeners.get(topicName)).size() >= 1);
                this.systemTopicBasedTopicPoliciesService.unregisterListener(topicName, c1TopicPolicyListenerImpl);
            }
        });
        for (int i = 0; i < 100; i++) {
            C1TopicPolicyListenerImpl c1TopicPolicyListenerImpl = new C1TopicPolicyListenerImpl();
            this.systemTopicBasedTopicPoliciesService.registerListener(topicName, c1TopicPolicyListenerImpl);
            Assert.assertNotNull(this.systemTopicBasedTopicPoliciesService.listeners.get(topicName));
            Assert.assertTrue(((List) this.systemTopicBasedTopicPoliciesService.listeners.get(topicName)).size() >= 1);
            this.systemTopicBasedTopicPoliciesService.unregisterListener(topicName, c1TopicPolicyListenerImpl);
        }
        thenRunAsync.get();
        Assert.assertFalse(this.systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
    }

    @Test
    public void testGetPolicy() throws ExecutionException, InterruptedException, BrokerServiceException.TopicPoliciesCacheNotInitException {
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, TopicPolicies.builder().maxConsumerPerTopic(10).build()).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).booleanValue());
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
        });
        TopicPolicies build = TopicPolicies.builder().maxProducerPerTopic(1).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, build).get();
        TopicPolicies build2 = TopicPolicies.builder().maxProducerPerTopic(2).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, build2).get();
        TopicPolicies build3 = TopicPolicies.builder().maxProducerPerTopic(3).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, build3).get();
        TopicPolicies build4 = TopicPolicies.builder().maxProducerPerTopic(4).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, build4).get();
        TopicPolicies build5 = TopicPolicies.builder().maxProducerPerTopic(5).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, build5).get();
        TopicPolicies build6 = TopicPolicies.builder().maxProducerPerTopic(6).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, build6).get();
        Awaitility.await().untilAsserted(() -> {
            TopicPolicies topicPolicies = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
            TopicPolicies topicPolicies2 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
            TopicPolicies topicPolicies3 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
            TopicPolicies topicPolicies4 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
            TopicPolicies topicPolicies5 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
            TopicPolicies topicPolicies6 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
            Assert.assertEquals(topicPolicies, build);
            Assert.assertEquals(topicPolicies2, build2);
            Assert.assertEquals(topicPolicies3, build3);
            Assert.assertEquals(topicPolicies4, build4);
            Assert.assertEquals(topicPolicies5, build5);
            Assert.assertEquals(topicPolicies6, build6);
        });
        Assert.assertEquals(this.systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6L);
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1)));
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2)));
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3)));
        build.setMaxProducerPerTopic(101);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, build);
        build2.setMaxProducerPerTopic(102);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, build2);
        build2.setMaxProducerPerTopic(103);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, build2);
        build.setMaxProducerPerTopic(104);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, build);
        build2.setMaxProducerPerTopic(105);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, build2);
        build.setMaxProducerPerTopic(106);
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, build);
        Awaitility.await().untilAsserted(() -> {
            TopicPolicies topicPolicies = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
            TopicPolicies topicPolicies2 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
            Assert.assertEquals(build, topicPolicies);
            Assert.assertEquals(build2, topicPolicies2);
        });
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2)));
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1)));
        Assert.assertTrue(this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3)));
        Assert.assertEquals(build, (TopicPolicies) this.systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get());
    }

    @Test
    public void testCacheCleanup() throws Exception {
        String str = "persistent://system-topic/namespace-1/test" + UUID.randomUUID();
        TopicName topicName = TopicName.get(str);
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newProducer().topic(str).create().close();
        this.admin.topics().setMaxConsumers(str, 1000);
        Awaitility.await().untilAsserted(() -> {
            AssertJUnit.assertNotNull(this.admin.topics().getMaxConsumers(str));
        });
        Map policiesCache = this.systemTopicBasedTopicPoliciesService.getPoliciesCache();
        Map listeners = this.systemTopicBasedTopicPoliciesService.getListeners();
        AssertJUnit.assertNotNull(policiesCache.get(topicName));
        AssertJUnit.assertEquals(((TopicPolicies) policiesCache.get(topicName)).getMaxConsumerPerTopic().intValue(), 1000);
        AssertJUnit.assertNotNull(((List) listeners.get(topicName)).get(0));
        this.admin.topics().deletePartitionedTopic(str, true);
        this.admin.namespaces().unload(NAMESPACE1);
        AssertJUnit.assertNull(policiesCache.get(topicName));
        AssertJUnit.assertNull(listeners.get(topicName));
    }

    @Test
    public void testListenerCleanupByPartition() throws Exception {
        String str = "persistent://system-topic/namespace-1/test" + UUID.randomUUID();
        TopicName topicName = TopicName.get(str);
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newProducer().topic(str).create().close();
        Map listeners = this.systemTopicBasedTopicPoliciesService.getListeners();
        Awaitility.await().untilAsserted(() -> {
            AssertJUnit.assertEquals(((List) listeners.get(topicName)).size(), 3);
        });
        this.admin.topics().unload(topicName.getPartition(0).toString());
        AssertJUnit.assertEquals(((List) listeners.get(topicName)).size(), 2);
        this.admin.topics().unload(topicName.getPartition(1).toString());
        AssertJUnit.assertEquals(((List) listeners.get(topicName)).size(), 1);
        this.admin.topics().unload(topicName.getPartition(2).toString());
        AssertJUnit.assertNull(listeners.get(topicName));
    }

    private void prepareData() throws PulsarAdminException {
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("system-topic", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.namespaces().createNamespace(NAMESPACE2);
        this.admin.namespaces().createNamespace(NAMESPACE3);
        this.admin.lookups().lookupTopic(TOPIC1.toString());
        this.admin.lookups().lookupTopic(TOPIC2.toString());
        this.admin.lookups().lookupTopic(TOPIC3.toString());
        this.admin.lookups().lookupTopic(TOPIC4.toString());
        this.admin.lookups().lookupTopic(TOPIC5.toString());
        this.admin.lookups().lookupTopic(TOPIC6.toString());
        this.systemTopicBasedTopicPoliciesService = this.pulsar.getTopicPoliciesService();
    }

    @Test
    public void testGetPolicyTimeout() throws Exception {
        SystemTopicBasedTopicPoliciesService topicPoliciesService = this.pulsar.getTopicPoliciesService();
        Awaitility.await().untilAsserted(() -> {
            AssertJUnit.assertTrue(((Boolean) topicPoliciesService.policyCacheInitMap.get(TOPIC1.getNamespaceObject())).booleanValue());
        });
        topicPoliciesService.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            topicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, new BackoffBuilder().setInitialTime(500L, TimeUnit.MILLISECONDS).setMandatoryStop(5000L, TimeUnit.MILLISECONDS).setMax(1000L, TimeUnit.MILLISECONDS).create(), this.pulsar.getExecutor(), false).get();
        } catch (Exception e) {
            AssertJUnit.assertTrue(e.getCause() instanceof BrokerServiceException.TopicPoliciesCacheNotInitException);
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        AssertJUnit.assertTrue("actual:" + currentTimeMillis2, currentTimeMillis2 >= 4000);
    }

    @Test
    public void testCreatSystemTopicClientWithRetry() throws Exception {
        SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) Mockito.spy(this.pulsar.getTopicPoliciesService());
        Field declaredField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("namespaceEventsSystemTopicFactory");
        declaredField.setAccessible(true);
        NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory = (NamespaceEventsSystemTopicFactory) Mockito.spy((NamespaceEventsSystemTopicFactory) declaredField.get(systemTopicBasedTopicPoliciesService));
        SystemTopicClient systemTopicClient = (SystemTopicClient) Mockito.mock(TopicPoliciesSystemTopicClient.class);
        ((NamespaceEventsSystemTopicFactory) Mockito.doReturn(systemTopicClient).when(namespaceEventsSystemTopicFactory)).createTopicPoliciesSystemTopicClient((NamespaceName) ArgumentMatchers.any());
        declaredField.set(systemTopicBasedTopicPoliciesService, namespaceEventsSystemTopicFactory);
        SystemTopicClient.Reader reader = (SystemTopicClient.Reader) Mockito.mock(SystemTopicClient.Reader.class);
        ((SystemTopicClient) Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))).doReturn(CompletableFuture.completedFuture(reader)).when(systemTopicClient)).newReaderAsync();
        AssertJUnit.assertEquals((SystemTopicClient.Reader) systemTopicBasedTopicPoliciesService.createSystemTopicClientWithRetry((NamespaceName) null).get(), reader);
    }

    @Test
    public void testGetTopicPoliciesWithRetry() throws Exception {
        Field declaredField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
        declaredField.setAccessible(true);
        ((Map) declaredField.get(this.systemTopicBasedTopicPoliciesService)).remove(NamespaceName.get(NAMESPACE1));
        Field declaredField2 = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
        declaredField2.setAccessible(true);
        ((Map) declaredField2.get(this.systemTopicBasedTopicPoliciesService)).remove(NamespaceName.get(NAMESPACE1));
        Backoff create = new BackoffBuilder().setInitialTime(500L, TimeUnit.MILLISECONDS).setMandatoryStop(5000L, TimeUnit.MILLISECONDS).setMax(1000L, TimeUnit.MILLISECONDS).create();
        final TopicPolicies build = TopicPolicies.builder().maxConsumerPerTopic(10).build();
        Executors.newScheduledThreadPool(1).schedule(new Runnable() { // from class: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesServiceTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    SystemTopicBasedTopicPoliciesServiceTest.this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(SystemTopicBasedTopicPoliciesServiceTest.TOPIC1, build).get();
                } catch (Exception e) {
                }
            }
        }, 2000L, TimeUnit.MILLISECONDS);
        Awaitility.await().untilAsserted(() -> {
            Optional optional = (Optional) this.systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, create, this.pulsar.getExecutor(), false).get();
            Assert.assertTrue(optional.isPresent());
            if (optional.isPresent()) {
                Assert.assertEquals(optional.get(), build);
            }
        });
    }
}
