package org.apache.pulsar.broker.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerBookieIsolationTest.class */
public class BrokerBookieIsolationTest {
    private LocalBookkeeperEnsemble bkEnsemble;
    private PulsarService pulsarService;
    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BrokerBookieIsolationTest.class);

    @BeforeMethod
    protected void setup() throws Exception {
        this.bkEnsemble = new LocalBookkeeperEnsemble(4, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
    }

    @AfterMethod
    protected void cleanup() throws Exception {
        if (this.pulsarService != null) {
            this.pulsarService.close();
        }
        this.bkEnsemble.stop();
    }

    @Test
    public void testBookieIsolation() throws Exception {
        String format = String.format("%s/%s/%s", "tenant1", "use", "ns1");
        String format2 = String.format("%s/%s/%s", "tenant1", "use", "ns2");
        String format3 = String.format("%s/%s/%s", "tenant1", "use", "ns3");
        String format4 = String.format("%s/%s/%s", "tenant1", "use", "ns4");
        BookieServer[] bookies = this.bkEnsemble.getBookies();
        ZooKeeper zkClient = this.bkEnsemble.getZkClient();
        HashSet newHashSet = Sets.newHashSet(bookies[0].getLocalAddress(), bookies[1].getLocalAddress());
        HashSet newHashSet2 = Sets.newHashSet(bookies[2].getLocalAddress(), bookies[3].getLocalAddress());
        setDefaultIsolationGroup("default-group", zkClient, newHashSet);
        setDefaultIsolationGroup("tenant1-isolation", zkClient, newHashSet2);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        serviceConfiguration.setClusterName("use");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBookkeeperClientIsolationGroups("default-group");
        serviceConfiguration.setManagedLedgerDefaultEnsembleSize(2);
        serviceConfiguration.setManagedLedgerDefaultWriteQuorum(2);
        serviceConfiguration.setManagedLedgerDefaultAckQuorum(2);
        serviceConfiguration.setAllowAutoTopicCreationType("non-partitioned");
        int i = 100 / 20;
        serviceConfiguration.setManagedLedgerMaxEntriesPerLedger(20);
        serviceConfiguration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.pulsarService = new PulsarService(serviceConfiguration);
        this.pulsarService.start();
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl(this.pulsarService.getWebServiceAddress()).build();
        build.clusters().createCluster("use", new ClusterData(this.pulsarService.getWebServiceAddress()));
        build.tenants().createTenant("tenant1", new TenantInfo((Set) null, Sets.newHashSet("use")));
        build.namespaces().createNamespace(format);
        build.namespaces().createNamespace(format2);
        build.namespaces().createNamespace(format3);
        build.namespaces().createNamespace(format4);
        build.namespaces().setBookieAffinityGroup(format2, new BookieAffinityGroupData("tenant1-isolation", (String) null));
        build.namespaces().setBookieAffinityGroup(format3, new BookieAffinityGroupData("tenant1-isolation", (String) null));
        build.namespaces().setBookieAffinityGroup(format4, new BookieAffinityGroupData("tenant1-isolation", (String) null));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format2), new BookieAffinityGroupData("tenant1-isolation", (String) null));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format3), new BookieAffinityGroupData("tenant1-isolation", (String) null));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format4), new BookieAffinityGroupData("tenant1-isolation", (String) null));
        try {
            build.namespaces().getBookieAffinityGroup(format);
        } catch (PulsarAdminException.NotFoundException e) {
        }
        PulsarClient build2 = PulsarClient.builder().serviceUrl(this.pulsarService.getBrokerServiceUrl()).statsInterval(-1L, TimeUnit.SECONDS).build();
        PersistentTopic createTopicAndPublish = createTopicAndPublish(build2, format, "topic1", 100);
        PersistentTopic createTopicAndPublish2 = createTopicAndPublish(build2, format2, "topic1", 100);
        PersistentTopic createTopicAndPublish3 = createTopicAndPublish(build2, format3, "topic1", 100);
        PersistentTopic createTopicAndPublish4 = createTopicAndPublish(build2, format4, "topic1", 100);
        Bookie bookie = bookies[0].getBookie();
        Field declaredField = Bookie.class.getDeclaredField("ledgerManager");
        declaredField.setAccessible(true);
        LedgerManager ledgerManager = (LedgerManager) declaredField.get(bookie);
        ManagedLedgerImpl managedLedger = createTopicAndPublish.getManagedLedger();
        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger.getLedgersInfoAsList(), newHashSet);
        ManagedLedgerImpl managedLedger2 = createTopicAndPublish2.getManagedLedger();
        Assert.assertEquals(managedLedger2.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger2.getLedgersInfoAsList(), newHashSet2);
        ManagedLedgerImpl managedLedger3 = createTopicAndPublish3.getManagedLedger();
        Assert.assertEquals(managedLedger3.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger3.getLedgersInfoAsList(), newHashSet2);
        ManagedLedgerImpl managedLedger4 = createTopicAndPublish4.getManagedLedger();
        Assert.assertEquals(managedLedger4.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger4.getLedgersInfoAsList(), newHashSet2);
        Assert.assertEquals(this.pulsarService.getManagedLedgerClientFactory().getBkEnsemblePolicyToBookKeeperMap().size(), 1);
    }

    @Test
    public void testBookieIsilationWithSecondaryGroup() throws Exception {
        String format = String.format("%s/%s/%s", "tenant1", "use", "ns1");
        String format2 = String.format("%s/%s/%s", "tenant1", "use", "ns2");
        String format3 = String.format("%s/%s/%s", "tenant1", "use", "ns3");
        String format4 = String.format("%s/%s/%s", "tenant1", "use", "ns4");
        BookieServer[] bookies = this.bkEnsemble.getBookies();
        ZooKeeper zkClient = this.bkEnsemble.getZkClient();
        HashSet newHashSet = Sets.newHashSet(bookies[0].getLocalAddress(), bookies[1].getLocalAddress());
        HashSet newHashSet2 = Sets.newHashSet(bookies[2].getLocalAddress(), bookies[3].getLocalAddress());
        HashSet newHashSet3 = Sets.newHashSet(new BookieSocketAddress("1.1.1.1:1111"), new BookieSocketAddress("1.1.1.1:1112"));
        setDefaultIsolationGroup("default-group", zkClient, newHashSet);
        setDefaultIsolationGroup("tenant1-isolation-primary", zkClient, newHashSet3);
        setDefaultIsolationGroup("tenant1-isolation=secondary", zkClient, newHashSet2);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        serviceConfiguration.setClusterName("use");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBookkeeperClientIsolationGroups("default-group");
        serviceConfiguration.setManagedLedgerDefaultEnsembleSize(2);
        serviceConfiguration.setManagedLedgerDefaultWriteQuorum(2);
        serviceConfiguration.setManagedLedgerDefaultAckQuorum(2);
        serviceConfiguration.setAllowAutoTopicCreationType("non-partitioned");
        int i = 100 / 20;
        serviceConfiguration.setManagedLedgerMaxEntriesPerLedger(20);
        serviceConfiguration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.pulsarService = new PulsarService(serviceConfiguration);
        this.pulsarService.start();
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl(this.pulsarService.getWebServiceAddress()).build();
        build.clusters().createCluster("use", new ClusterData(this.pulsarService.getWebServiceAddress()));
        build.tenants().createTenant("tenant1", new TenantInfo((Set) null, Sets.newHashSet("use")));
        build.namespaces().createNamespace(format);
        build.namespaces().createNamespace(format2);
        build.namespaces().createNamespace(format3);
        build.namespaces().createNamespace(format4);
        build.namespaces().setBookieAffinityGroup(format2, new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        build.namespaces().setBookieAffinityGroup(format3, new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        build.namespaces().setBookieAffinityGroup(format4, new BookieAffinityGroupData("tenant1-isolation-primary", (String) null));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format2), new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format3), new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format4), new BookieAffinityGroupData("tenant1-isolation-primary", (String) null));
        try {
            build.namespaces().getBookieAffinityGroup(format);
        } catch (PulsarAdminException.NotFoundException e) {
        }
        PulsarClient build2 = PulsarClient.builder().serviceUrl(this.pulsarService.getBrokerServiceUrl()).statsInterval(-1L, TimeUnit.SECONDS).build();
        PersistentTopic createTopicAndPublish = createTopicAndPublish(build2, format, "topic1", 100);
        PersistentTopic createTopicAndPublish2 = createTopicAndPublish(build2, format2, "topic1", 100);
        PersistentTopic createTopicAndPublish3 = createTopicAndPublish(build2, format3, "topic1", 100);
        Bookie bookie = bookies[0].getBookie();
        Field declaredField = Bookie.class.getDeclaredField("ledgerManager");
        declaredField.setAccessible(true);
        LedgerManager ledgerManager = (LedgerManager) declaredField.get(bookie);
        ManagedLedgerImpl managedLedger = createTopicAndPublish.getManagedLedger();
        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger.getLedgersInfoAsList(), newHashSet);
        ManagedLedgerImpl managedLedger2 = createTopicAndPublish2.getManagedLedger();
        Assert.assertEquals(managedLedger2.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger2.getLedgersInfoAsList(), newHashSet2);
        ManagedLedgerImpl managedLedger3 = createTopicAndPublish3.getManagedLedger();
        Assert.assertEquals(managedLedger3.getLedgersInfoAsList().size(), i);
        assertAffinityBookies(ledgerManager, managedLedger3.getLedgersInfoAsList(), newHashSet2);
        Assert.assertEquals(this.pulsarService.getManagedLedgerClientFactory().getBkEnsemblePolicyToBookKeeperMap().size(), 1);
        try {
            createTopicAndPublish(build2, format4, "topic1", 1);
            Assert.fail("should have failed due to not enough non-faulty bookie");
        } catch (PulsarClientException.BrokerPersistenceException e2) {
        }
    }

    @Test
    public void testDeleteIsolationGroup() throws Exception {
        String format = String.format("%s/%s/%s", "tenant1", "use", "ns2");
        String format2 = String.format("%s/%s/%s", "tenant1", "use", "ns3");
        BookieServer[] bookies = this.bkEnsemble.getBookies();
        ZooKeeper zkClient = this.bkEnsemble.getZkClient();
        HashSet newHashSet = Sets.newHashSet(bookies[0].getLocalAddress(), bookies[1].getLocalAddress());
        HashSet newHashSet2 = Sets.newHashSet(bookies[2].getLocalAddress(), bookies[3].getLocalAddress());
        setDefaultIsolationGroup("default-group", zkClient, newHashSet);
        setDefaultIsolationGroup("tenant1-isolation-primary", zkClient, Sets.newHashSet());
        setDefaultIsolationGroup("tenant1-isolation=secondary", zkClient, newHashSet2);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        serviceConfiguration.setClusterName("use");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBookkeeperClientIsolationGroups("default-group");
        serviceConfiguration.setManagedLedgerDefaultEnsembleSize(2);
        serviceConfiguration.setManagedLedgerDefaultWriteQuorum(2);
        serviceConfiguration.setManagedLedgerDefaultAckQuorum(2);
        serviceConfiguration.setAllowAutoTopicCreationType("non-partitioned");
        serviceConfiguration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.pulsarService = new PulsarService(serviceConfiguration);
        this.pulsarService.start();
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl(this.pulsarService.getWebServiceAddress()).build();
        build.clusters().createCluster("use", new ClusterData(this.pulsarService.getWebServiceAddress()));
        build.tenants().createTenant("tenant1", new TenantInfo((Set) null, Sets.newHashSet("use")));
        build.namespaces().createNamespace(format);
        build.namespaces().createNamespace(format2);
        build.namespaces().setBookieAffinityGroup(format, new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        build.namespaces().setBookieAffinityGroup(format2, new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format), new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format2), new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
        build.namespaces().deleteBookieAffinityGroup(format);
        try {
            build.namespaces().getBookieAffinityGroup(format);
            Assert.fail("should have fail due to affinity-group not present");
        } catch (PulsarAdminException.NotFoundException e) {
        }
        Assert.assertEquals(build.namespaces().getBookieAffinityGroup(format2), new BookieAffinityGroupData("tenant1-isolation-primary", "tenant1-isolation=secondary"));
    }

    private void assertAffinityBookies(LedgerManager ledgerManager, List<MLDataFormats.ManagedLedgerInfo.LedgerInfo> list, Set<BookieSocketAddress> set) throws Exception {
        Iterator<MLDataFormats.ManagedLedgerInfo.LedgerInfo> it = list.iterator();
        while (it.hasNext()) {
            LedgerMetadata ledgerMetadata = (LedgerMetadata) ((Versioned) ledgerManager.readLedgerMetadata(it.next().getLedgerId()).get()).getValue();
            HashSet newHashSet = Sets.newHashSet();
            newHashSet.addAll((Collection) ledgerMetadata.getAllEnsembles().values().iterator().next());
            Assert.assertEquals(newHashSet.size(), set.size());
            newHashSet.removeAll(set);
            Assert.assertEquals(newHashSet.size(), 0);
        }
    }

    private Topic createTopicAndPublish(PulsarClient pulsarClient, String str, String str2, int i) throws Exception {
        String format = String.format("persistent://%s/%s", str, str2);
        pulsarClient.newConsumer().topic(format).subscriptionName("my-subscriber-name").subscribe().close();
        Producer<byte[]> create = pulsarClient.newProducer().topic(format).sendTimeout(5, TimeUnit.SECONDS).create();
        for (int i2 = 0; i2 < i; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        create.close();
        return (Topic) this.pulsarService.getBrokerService().getTopicReference(format).get();
    }

    private void setDefaultIsolationGroup(String str, ZooKeeper zooKeeper, Set<BookieSocketAddress> set) throws Exception {
        BookiesRackConfiguration bookiesRackConfiguration = null;
        try {
            byte[] data = zooKeeper.getData("/bookies", false, (Stat) null);
            System.out.println(new String(data));
            bookiesRackConfiguration = (BookiesRackConfiguration) this.jsonMapper.readValue(data, BookiesRackConfiguration.class);
        } catch (KeeperException.NoNodeException e) {
            zooKeeper.create("/bookies", "".getBytes(), Acl, CreateMode.PERSISTENT);
        }
        if (bookiesRackConfiguration == null) {
            bookiesRackConfiguration = new BookiesRackConfiguration();
        }
        HashMap newHashMap = Maps.newHashMap();
        for (BookieSocketAddress bookieSocketAddress : set) {
            newHashMap.put(bookieSocketAddress.toString(), new BookieInfo("use", bookieSocketAddress.getHostName() + ":" + bookieSocketAddress.getPort()));
        }
        bookiesRackConfiguration.put(str, newHashMap);
        zooKeeper.setData("/bookies", this.jsonMapper.writeValueAsBytes(bookiesRackConfiguration), -1);
    }
}
