package org.apache.pulsar.broker;

import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/SLAMonitoringTest.class */
public class SLAMonitoringTest {
    private static final Logger log = LoggerFactory.getLogger(SLAMonitoringTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    ExecutorService executor;
    private static final int BROKER_COUNT = 5;
    private final int[] brokerWebServicePorts = new int[5];
    private final int[] brokerNativeBrokerPorts = new int[5];
    private final URL[] brokerUrls = new URL[5];
    private final PulsarService[] pulsarServices = new PulsarService[5];
    private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];
    private final ServiceConfiguration[] configurations = new ServiceConfiguration[5];

    @BeforeClass(alwaysRun = true)
    void setup() throws Exception {
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        log.info("---- Initializing SLAMonitoringTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        for (int i = 0; i < 5; i++) {
            ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
            serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
            serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
            serviceConfiguration.setBrokerServicePort(Optional.of(0));
            serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
            serviceConfiguration.setClusterName("my-cluster");
            serviceConfiguration.setAdvertisedAddress("localhost");
            serviceConfiguration.setWebServicePort(Optional.of(0));
            serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
            serviceConfiguration.setLoadBalancerEnabled(false);
            this.configurations[i] = serviceConfiguration;
            this.pulsarServices[i] = new PulsarService(serviceConfiguration);
            this.pulsarServices[i].start();
            this.brokerWebServicePorts[i] = ((Integer) this.pulsarServices[i].getListenPortHTTP().get()).intValue();
            this.brokerNativeBrokerPorts[i] = ((Integer) this.pulsarServices[i].getBrokerListenPort().get()).intValue();
            this.brokerUrls[i] = new URL(this.pulsarServices[i].getWebServiceAddress());
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrls[i].toString()).build();
        }
        Thread.sleep(100L);
        createTenant(this.pulsarAdmins[4]);
        for (int i2 = 0; i2 < 5; i2++) {
            this.pulsarAdmins[0].namespaces().createNamespace(String.format("%s/%s/%s:%s", "sla-monitor", "my-cluster", this.pulsarServices[i2].getAdvertisedAddress(), Integer.valueOf(this.brokerWebServicePorts[i2])));
        }
    }

    private void createTenant(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl(pulsarAdmin.getServiceUrl()).build());
        HashSet hashSet = new HashSet();
        hashSet.add("my-cluster");
        pulsarAdmin.tenants().createTenant("sla-monitor", TenantInfo.builder().adminRoles(Collections.singleton("")).allowedClusters(hashSet).build());
    }

    @AfterClass(alwaysRun = true)
    public void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdownNow();
        this.executor = null;
        for (int i = 0; i < 5; i++) {
            this.pulsarAdmins[i].close();
            this.pulsarServices[i].close();
        }
        this.bkEnsemble.stop();
    }

    @Test
    public void testOwnershipAfterSetup() {
        for (int i = 0; i < 5; i++) {
            try {
                Assert.assertTrue(this.pulsarServices[0].getNamespaceService().registerSLANamespace());
            } catch (PulsarServerException e) {
                e.printStackTrace();
                log.error("Exception occurred", e);
                Assert.fail("SLA Namespace should have been owned by the broker, Exception.", e);
            }
        }
    }

    @Test
    public void testOwnedNamespaces() {
        testOwnershipViaAdminAfterSetup();
        for (int i = 0; i < 5; i++) {
            try {
                List activeBrokers = this.pulsarAdmins[i].brokers().getActiveBrokers("my-cluster");
                Assert.assertNotNull(activeBrokers);
                Assert.assertEquals(activeBrokers.size(), 5);
                Assert.assertEquals(this.pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster", (String) activeBrokers.get(0)).size(), 3);
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("Hearbeat namespace and SLA namespace should be owned by the broker");
                return;
            }
        }
    }

    @Test
    public void testOwnershipViaAdminAfterSetup() {
        for (int i = 0; i < 5; i++) {
            try {
                Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic(String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[i].getAdvertisedAddress(), Integer.valueOf(this.brokerWebServicePorts[i]), "my-topic")), "pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i]);
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("SLA Namespace should have been owned by the broker(pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i] + ")");
            }
        }
    }

    @Test
    public void testUnloadIfBrokerCrashes() {
        log.info("Trying to close the broker at index = {}", 2);
        try {
            this.pulsarServices[2].close();
        } catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail("Should be a able to close the broker index " + 2 + " Exception: " + e);
        }
        String format = String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[2].getAdvertisedAddress(), Integer.valueOf(this.brokerWebServicePorts[2]), "my-topic");
        log.info("Lookup for namespace {}", format);
        String str = null;
        try {
            str = this.pulsarAdmins[4].lookups().lookupTopic(format);
            log.info("{} Namespace is owned by {}", format, str);
            Assert.assertNotEquals(str, "pulsar://" + this.pulsarServices[2].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[2]);
        } catch (PulsarAdminException e2) {
            e2.printStackTrace();
            Assert.fail("The SLA Monitor namespace should be owned by some other broker");
        }
        try {
            this.pulsarServices[2] = new PulsarService(this.configurations[2]);
            this.pulsarServices[2].start();
            this.brokerNativeBrokerPorts[2] = ((Integer) this.pulsarServices[2].getBrokerListenPort().get()).intValue();
        } catch (PulsarServerException e3) {
            e3.printStackTrace();
            Assert.fail("The broker should be able to start without exception");
        }
        try {
            str = this.pulsarAdmins[0].lookups().lookupTopic(format);
            log.info("{} Namespace is re-owned by {}", format, str);
            Assert.assertEquals(str, "pulsar://" + this.pulsarServices[2].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[2]);
        } catch (PulsarAdminException e4) {
            e4.printStackTrace();
            Assert.fail("The SLA Monitor namespace should be reowned by the broker" + str);
        }
        try {
            this.pulsarServices[2].close();
        } catch (PulsarServerException e5) {
            e5.printStackTrace();
            Assert.fail("The broker should be able to stop without exception");
        }
    }
}
