package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerServiceTest.class */
public class BrokerServiceTest extends BrokerTestBase {
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testOwnedNsCheck() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        brokerService.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(topic -> {
            countDownLatch.countDown();
            Assert.fail("should fail as NS is not owned");
        }).exceptionally(th -> {
            Assert.assertTrue(th.getCause() instanceof IOException);
            countDownLatch.countDown();
            return null;
        });
        countDownLatch.await();
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/successTopic");
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        brokerService.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(topic2 -> {
            try {
                Assert.assertNotNull(brokerService.getTopicReference("persistent://prop/ns-abc/successTopic"));
            } catch (Exception e) {
                Assert.fail("should not fail");
            }
            countDownLatch2.countDown();
        }).exceptionally(th2 -> {
            countDownLatch2.countDown();
            Assert.fail("should not fail");
            return null;
        });
        countDownLatch2.await();
    }

    @Test
    public void testBrokerServicePersistentTopicStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successTopic"}).subscriptionName("successSub").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successTopic").get();
        Assert.assertNotNull(persistentTopic);
        rolloverPerIntervalStats();
        TopicStats stats = persistentTopic.getStats();
        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.subscriptions.values().iterator().next();
        Assert.assertEquals(stats.subscriptions.keySet().size(), 1);
        Assert.assertEquals(subscriptionStats.msgBacklog, 0L);
        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        TopicStats stats2 = persistentTopic.getStats();
        SubscriptionStats subscriptionStats2 = (SubscriptionStats) stats2.subscriptions.values().iterator().next();
        Assert.assertEquals(subscriptionStats2.msgBacklog, 10L);
        Assert.assertEquals(stats2.publishers.size(), 1);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).msgRateIn > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).msgThroughputIn > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).averageMsgSize > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertNotNull(((PublisherStats) stats2.publishers.get(0)).getClientVersion());
        Assert.assertEquals(Double.valueOf(stats2.msgRateIn), Double.valueOf(((PublisherStats) stats2.publishers.get(0)).msgRateIn));
        Assert.assertEquals(Double.valueOf(stats2.msgThroughputIn), Double.valueOf(((PublisherStats) stats2.publishers.get(0)).msgThroughputIn));
        Assert.assertTrue(Math.abs(stats2.averageMsgSize - ((PublisherStats) stats2.publishers.get(0)).averageMsgSize) < 1.0E-6d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgRateOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut));
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgThroughputOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut));
        Assert.assertEquals(Double.valueOf(stats2.msgRateOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut));
        Assert.assertEquals(Double.valueOf(stats2.msgThroughputOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut));
        Assert.assertNotNull(((ConsumerStats) subscriptionStats2.consumers.get(0)).getClientVersion());
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        Assert.assertEquals(((SubscriptionStats) persistentTopic.getStats().subscriptions.values().iterator().next()).msgBacklog, 0L);
    }

    @Test
    public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successSharedTopic"}).subscriptionName("successSharedSub").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successSharedTopic").get();
        Assert.assertNotNull(persistentTopic);
        rolloverPerIntervalStats();
        TopicStats stats = persistentTopic.getStats();
        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.subscriptions.values().iterator().next();
        Assert.assertEquals(stats.subscriptions.keySet().size(), 1);
        Assert.assertEquals(subscriptionStats.msgBacklog, 0L);
        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successSharedTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        TopicStats stats2 = persistentTopic.getStats();
        SubscriptionStats subscriptionStats2 = (SubscriptionStats) stats2.subscriptions.values().iterator().next();
        Assert.assertEquals(subscriptionStats2.msgBacklog, 10L);
        Assert.assertEquals(stats2.publishers.size(), 1);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).msgRateIn > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).msgThroughputIn > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((PublisherStats) stats2.publishers.get(0)).averageMsgSize > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(Double.valueOf(stats2.msgRateIn), Double.valueOf(((PublisherStats) stats2.publishers.get(0)).msgRateIn));
        Assert.assertEquals(Double.valueOf(stats2.msgThroughputIn), Double.valueOf(((PublisherStats) stats2.publishers.get(0)).msgThroughputIn));
        Assert.assertTrue(Math.abs(stats2.averageMsgSize - ((PublisherStats) stats2.publishers.get(0)).averageMsgSize) < 1.0E-6d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgRateRedeliver), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Assert.assertEquals(((ConsumerStats) subscriptionStats2.consumers.get(0)).unackedMessages, 10);
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgRateOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut));
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgThroughputOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut));
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgRateRedeliver), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateRedeliver));
        Assert.assertEquals(Double.valueOf(stats2.msgRateOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateOut));
        Assert.assertEquals(Double.valueOf(stats2.msgThroughputOut), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgThroughputOut));
        Assert.assertEquals(Double.valueOf(subscriptionStats2.msgRateRedeliver), Double.valueOf(((ConsumerStats) subscriptionStats2.consumers.get(0)).msgRateRedeliver));
        Assert.assertEquals(subscriptionStats2.unackedMessages, ((ConsumerStats) subscriptionStats2.consumers.get(0)).unackedMessages);
        subscribe.redeliverUnacknowledgedMessages();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        SubscriptionStats subscriptionStats3 = (SubscriptionStats) persistentTopic.getStats().subscriptions.values().iterator().next();
        Assert.assertTrue(subscriptionStats3.msgRateRedeliver > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(Double.valueOf(subscriptionStats3.msgRateRedeliver), Double.valueOf(((ConsumerStats) subscriptionStats3.consumers.get(0)).msgRateRedeliver));
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        Assert.assertEquals(((SubscriptionStats) persistentTopic.getStats().subscriptions.values().iterator().next()).msgBacklog, 0L);
    }

    @Test
    public void testBrokerStatsMetrics() throws Exception {
        BrokerStats brokerStats = this.admin.brokerStats();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
        Thread.sleep(100L);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/newTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        JsonArray metrics = brokerStats.getMetrics();
        boolean z = false;
        boolean z2 = false;
        for (int i3 = 0; i3 < metrics.size(); i3++) {
            try {
                String jsonElement = metrics.get(i3).getAsJsonObject().get("dimensions").toString();
                if (!z && jsonElement.contains("prop/ns-abc")) {
                    z = true;
                }
                if (!z2 && jsonElement.contains("prop/ns-abc")) {
                    z2 = true;
                }
            } catch (Exception e) {
            }
        }
        Assert.assertTrue(z && z2);
        Thread.sleep(100L);
    }

    @Test
    public void testBrokerServiceNamespaceStats() throws Exception {
        ArrayList<String> newArrayList = Lists.newArrayList("prop/stats1", "prop/stats2");
        ArrayList newArrayList2 = Lists.newArrayList();
        BrokerStats brokerStats = this.admin.brokerStats();
        for (String str : newArrayList) {
            this.admin.namespaces().createNamespace(str, 4);
            this.admin.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet("test"));
            newArrayList2.add(this.pulsarClient.newProducer().topic(String.format("persistent://%s/topic1", str)).create());
            newArrayList2.add(this.pulsarClient.newProducer().topic(String.format("persistent://%s/topic2", str)).create());
        }
        rolloverPerIntervalStats();
        JsonObject topics = brokerStats.getTopics();
        Assert.assertEquals(topics.size(), 2, topics.toString());
        for (String str2 : newArrayList) {
            JsonObject asJsonObject = topics.getAsJsonObject(str2);
            for (String str3 : this.admin.namespaces().getTopics(str2)) {
                JsonObject asJsonObject2 = asJsonObject.getAsJsonObject(this.pulsar.getNamespaceService().getBundle(TopicName.get(str3)).getBundleRange()).getAsJsonObject("persistent");
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                asJsonObject2.entrySet().iterator().forEachRemaining(entry -> {
                    if (((String) entry.getKey()).equals(str3)) {
                        atomicBoolean.set(true);
                    }
                });
                Assert.assertTrue(atomicBoolean.get());
            }
        }
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            ((Producer) it.next()).close();
        }
        for (String str4 : newArrayList) {
            Iterator it2 = this.admin.namespaces().getTopics(str4).iterator();
            while (it2.hasNext()) {
                this.admin.topics().delete((String) it2.next());
            }
            this.admin.namespaces().deleteNamespace(str4);
        }
    }

    @Test
    public void testTlsDisabled() throws Exception {
        PulsarClient pulsarClient = null;
        this.conf.setAuthenticationEnabled(false);
        restartBroker();
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                Consumer subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                pulsarClient.close();
            } catch (Exception e) {
                Assert.fail("should not fail");
                pulsarClient.close();
            }
            try {
                try {
                    PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).statsInterval(0L, TimeUnit.SECONDS).build();
                    Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                    try {
                        Assert.fail("TLS connection should fail");
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        build.close();
                    } catch (Throwable th) {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        throw th;
                    }
                } catch (Exception e2) {
                    Assert.assertTrue(e2.getMessage().contains("ConnectException"));
                    pulsarClient.close();
                }
            } catch (Throwable th2) {
                pulsarClient.close();
                throw th2;
            }
        } catch (Throwable th3) {
            pulsarClient.close();
            throw th3;
        }
    }

    @Test
    public void testTlsEnabled() throws Exception {
        Consumer subscribe;
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePortTls(Integer.valueOf(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Integer.valueOf(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        restartBroker();
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                pulsarClient.close();
            } catch (Exception e) {
                Assert.fail("should not fail");
                pulsarClient.close();
            }
            try {
                try {
                    pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                    Consumer subscribe3 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                    if (Collections.singletonList(subscribe3).get(0) != null) {
                        subscribe3.close();
                    }
                    pulsarClient.close();
                } catch (Exception e2) {
                    Assert.fail("should not fail");
                    pulsarClient.close();
                }
                try {
                    try {
                        pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).statsInterval(0L, TimeUnit.SECONDS).build();
                        subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                    } catch (Exception e3) {
                        Assert.assertTrue(e3.getMessage().contains("General OpenSslEngine problem"));
                        pulsarClient.close();
                    }
                    try {
                        Assert.fail("should fail");
                        if (Collections.singletonList(subscribe).get(0) != null) {
                            subscribe.close();
                        }
                        pulsarClient.close();
                        try {
                            try {
                                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).tlsTrustCertsFilePath("./src/test/resources/certificate/server.crt").statsInterval(0L, TimeUnit.SECONDS).build();
                                Consumer subscribe4 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                                if (Collections.singletonList(subscribe4).get(0) != null) {
                                    subscribe4.close();
                                }
                                pulsarClient.close();
                            } catch (Exception e4) {
                                Assert.fail("should not fail");
                                pulsarClient.close();
                            }
                        } catch (Throwable th) {
                            pulsarClient.close();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        if (Collections.singletonList(subscribe).get(0) != null) {
                            subscribe.close();
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    pulsarClient.close();
                    throw th3;
                }
            } catch (Throwable th4) {
                pulsarClient.close();
                throw th4;
            }
        } catch (Throwable th5) {
            pulsarClient.close();
            throw th5;
        }
    }

    @Test
    public void testTlsAuthAllowInsecure() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Integer.valueOf(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Integer.valueOf(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(true);
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                Consumer subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                try {
                    Assert.fail("should fail");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    pulsarClient.close();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                pulsarClient.close();
                throw th2;
            }
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Authentication required"));
            pulsarClient.close();
        }
        try {
            try {
                AuthenticationTls authenticationTls = new AuthenticationTls();
                authenticationTls.configure(hashMap);
                pulsarClient = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                pulsarClient.close();
            } catch (Throwable th3) {
                pulsarClient.close();
                throw th3;
            }
        } catch (Exception e2) {
            Assert.fail("should not fail");
            pulsarClient.close();
        }
    }

    @Test
    public void testTlsAuthDisallowInsecure() throws Exception {
        Consumer subscribe;
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Integer.valueOf(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Integer.valueOf(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
                try {
                    Assert.fail("should fail");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    pulsarClient.close();
                } finally {
                }
            } catch (Throwable th) {
                pulsarClient.close();
                throw th;
            }
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Authentication required"));
            pulsarClient.close();
        }
        try {
            try {
                AuthenticationTls authenticationTls = new AuthenticationTls();
                authenticationTls.configure(hashMap);
                PulsarClient build = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                subscribe = build.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
                try {
                    Assert.fail("should fail");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    build.close();
                } finally {
                }
            } catch (Throwable th2) {
                pulsarClient.close();
                throw th2;
            }
        } catch (Exception e2) {
            Assert.assertTrue(e2.getMessage().contains("Authentication required"));
            pulsarClient.close();
        }
    }

    @Test
    public void testTlsAuthUseTrustCert() throws Exception {
        Consumer subscribe;
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Integer.valueOf(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Integer.valueOf(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setTlsTrustCertsFilePath("./src/test/resources/certificate/client.crt");
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            } catch (Exception e) {
                Assert.assertTrue(e.getMessage().contains("Authentication required"));
                pulsarClient.close();
            }
            try {
                Assert.fail("should fail");
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                pulsarClient.close();
                try {
                    try {
                        AuthenticationTls authenticationTls = new AuthenticationTls();
                        authenticationTls.configure(hashMap);
                        pulsarClient = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
                        Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        pulsarClient.close();
                    } catch (Exception e2) {
                        Assert.fail("should not fail");
                        pulsarClient.close();
                    }
                } catch (Throwable th) {
                    pulsarClient.close();
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            pulsarClient.close();
            throw th3;
        }
    }

    @Test
    public void testLookupThrottlingForClientByClient() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(new URI("pulsar://localhost:" + this.BROKER_PORT).toString()).statsInterval(0L, TimeUnit.SECONDS).maxConcurrentLookupRequests(1).maxLookupRequests(2).build();
        try {
            CompletableFuture subscribeAsync = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub1").subscribeAsync();
            CompletableFuture subscribeAsync2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub2").subscribeAsync();
            ((Consumer) subscribeAsync.get()).close();
            ((Consumer) subscribeAsync2.get()).close();
        } catch (Exception e) {
            Assert.fail("Subscribe should success with 2 requests");
        }
        try {
            CompletableFuture subscribeAsync3 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub11").subscribeAsync();
            CompletableFuture subscribeAsync4 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub22").subscribeAsync();
            CompletableFuture subscribeAsync5 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub33").subscribeAsync();
            ((Consumer) subscribeAsync3.get()).close();
            ((Consumer) subscribeAsync4.get()).close();
            ((Consumer) subscribeAsync5.get()).close();
            Assert.fail("It should fail as throttling should only receive 2 requests");
        } catch (Exception e2) {
            if (e2.getCause() instanceof PulsarClientException.TooManyRequestsException) {
                return;
            }
            Assert.fail("Subscribe should fail with TooManyRequestsException");
        }
    }

    @Test
    public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
        this.admin.namespaces().createNamespace("prop/disableBundle");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/disableBundle", Sets.newHashSet("test"));
        TopicName topicName = TopicName.get("persistent://prop/disableBundle/my-topic");
        this.pulsarClient.newProducer().topic("persistent://prop/disableBundle/my-topic").create().close();
        this.pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.pulsar.getNamespaceService().getBundle(topicName), false);
        try {
            this.pulsar.getBrokerService().loadOrCreatePersistentTopic("persistent://prop/disableBundle/my-topic", true).get();
            Assert.fail("Topic creation should fail due to disable bundle");
        } catch (Exception e) {
            if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                return;
            }
            Assert.fail("Topic creation should fail with ServiceUnitNotReadyException");
        }
    }

    @Test(timeOut = 3000)
    public void testTopicFailureShouldNotHaveDeadLock() {
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create().close();
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new NullPointerException("failed to peristent policy"));
        ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getManagedLedgerConfig((TopicName) Matchers.anyObject());
        CompletableFuture completableFuture2 = new CompletableFuture();
        newSingleThreadExecutor.submit(() -> {
            brokerService.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> {
                completableFuture2.complete(null);
            }).exceptionally(th -> {
                completableFuture2.completeExceptionally(th.getCause());
                return null;
            });
        });
        try {
            try {
                completableFuture2.get(1L, TimeUnit.SECONDS);
                newSingleThreadExecutor.shutdownNow();
            } catch (InterruptedException | TimeoutException e2) {
                Assert.fail("there is a dead-lock and it should have been prevented");
                newSingleThreadExecutor.shutdownNow();
            } catch (ExecutionException e3) {
                Assert.assertTrue(e3.getCause() instanceof NullPointerException);
                newSingleThreadExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create().close();
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(new ManagedLedgerConfig());
        ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getManagedLedgerConfig((TopicName) Matchers.anyObject());
        CompletableFuture completableFuture2 = new CompletableFuture();
        Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) declaredField.get(this.pulsar.getManagedLedgerFactory());
        CompletableFuture completableFuture3 = new CompletableFuture();
        completableFuture3.completeExceptionally(new ManagedLedgerException("ledger opening failed"));
        concurrentHashMap.put("prop/ns-abc/persistent/deadLockTestTopic", completableFuture3);
        newSingleThreadExecutor.submit(() -> {
            brokerService.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> {
                completableFuture2.complete(null);
            }).exceptionally(th -> {
                completableFuture2.completeExceptionally(th.getCause());
                return null;
            });
        });
        try {
            try {
                try {
                    completableFuture2.get(1L, TimeUnit.SECONDS);
                    newSingleThreadExecutor.shutdownNow();
                    concurrentHashMap.clear();
                } catch (InterruptedException | TimeoutException e2) {
                    Assert.fail("there is a dead-lock and it should have been prevented");
                    newSingleThreadExecutor.shutdownNow();
                    concurrentHashMap.clear();
                }
            } catch (ExecutionException e3) {
                Assert.assertEquals(e3.getCause().getClass(), BrokerServiceException.PersistenceException.class);
                newSingleThreadExecutor.shutdownNow();
                concurrentHashMap.clear();
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            concurrentHashMap.clear();
            throw th;
        }
    }

    @Test
    public void testCreateNamespacePolicy() throws Exception {
        System.err.println("----------------");
        this.admin.namespaces().createNamespace("prop/testPolicy", new BundlesData(3));
        String joinPath = PulsarWebResource.joinPath(new String[]{"/admin/local-policies", "prop/testPolicy"});
        this.pulsar.getLocalZkCacheService().policiesCache().clear();
        Optional optional = this.pulsar.getLocalZkCacheService().policiesCache().get(joinPath);
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(((LocalPolicies) optional.get()).bundles.numBundles, 3);
    }
}
