/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.loadbalance;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ExtensibleLoadManagerTest
extends TestRetrySupport {
    private static final Logger log = LoggerFactory.getLogger(ExtensibleLoadManagerTest.class);
    private static final int NUM_BROKERS = 3;
    private static final String DEFAULT_TENANT = "my-tenant";
    private static final String DEFAULT_NAMESPACE = "my-tenant/my-namespace";
    private static final String nsSuffix = "-anti-affinity-enabled";
    private final String clusterName = "MultiLoadManagerTest-" + UUID.randomUUID();
    private final PulsarClusterSpec spec = PulsarClusterSpec.builder().clusterName(this.clusterName).numBrokers(3).build();
    private PulsarCluster pulsarCluster = null;
    private String hosts;
    private PulsarAdmin admin;

    @BeforeClass(alwaysRun=true)
    public void setup() throws Exception {
        this.incrementSetupNumber();
        HashMap<String, String> brokerEnvs = new HashMap<String, String>();
        brokerEnvs.put("loadManagerClassName", "org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl");
        brokerEnvs.put("loadBalancerLoadSheddingStrategy", "org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
        brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
        brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
        brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
        this.spec.brokerEnvs(brokerEnvs);
        this.pulsarCluster = PulsarCluster.forSpec(this.spec);
        this.pulsarCluster.start();
        this.admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
        Assert.assertEquals((int)this.admin.brokers().getActiveBrokers(this.clusterName).size(), (int)3);
        this.admin.tenants().createTenant(DEFAULT_TENANT, (TenantInfo)new TenantInfoImpl(new HashSet(), Set.of(this.pulsarCluster.getClusterName())));
        this.admin.namespaces().createNamespace(DEFAULT_NAMESPACE, 100);
    }

    @AfterClass(alwaysRun=true)
    public void cleanup() {
        this.markCurrentSetupNumberCleaned();
        if (this.pulsarCluster != null) {
            this.pulsarCluster.stop();
            this.pulsarCluster = null;
        }
        if (this.admin != null) {
            this.admin.close();
            this.admin = null;
        }
    }

    @BeforeMethod(alwaysRun=true)
    public void startBroker() {
        if (this.pulsarCluster != null) {
            this.pulsarCluster.getBrokers().forEach(brokerContainer -> {
                if (!brokerContainer.isRunning()) {
                    brokerContainer.start();
                }
            });
            String topicName = "persistent://my-tenant/my-namespace/startBrokerCheck";
            Awaitility.await().atMost(120L, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
                for (BrokerContainer brokerContainer : this.pulsarCluster.getBrokers()) {
                    PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(brokerContainer.getHttpServiceUrl()).build();
                    try {
                        if (admin.brokers().getActiveBrokers(this.clusterName).size() != 3) {
                            Boolean bl = false;
                            return bl;
                        }
                        try {
                            admin.topics().createPartitionedTopic(topicName, 10);
                        }
                        catch (PulsarAdminException.ConflictException conflictException) {
                            // empty catch block
                        }
                        admin.lookups().lookupPartitionedTopic(topicName);
                    }
                    finally {
                        if (admin == null) continue;
                        admin.close();
                    }
                }
                return true;
            });
        }
    }

    @Test(timeOut=40000L)
    public void testConcurrentLookups() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/testConcurrentLookups";
        ArrayList<PulsarAdmin> admins = new ArrayList<PulsarAdmin>();
        int numAdminForBroker = 10;
        for (int i = 0; i < numAdminForBroker; ++i) {
            admins.add(PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build());
        }
        this.admin.topics().createPartitionedTopic(topicName, 100);
        ExecutorService executor = Executors.newFixedThreadPool(admins.size());
        CountDownLatch latch = new CountDownLatch(admins.size());
        CopyOnWriteArrayList result = new CopyOnWriteArrayList();
        for (PulsarAdmin admin : admins) {
            executor.execute(() -> {
                try {
                    result.add(admin.lookups().lookupPartitionedTopic(topicName));
                }
                catch (PulsarAdminException e) {
                    log.error("Lookup partitioned topic failed.", (Throwable)e);
                }
                latch.countDown();
            });
        }
        latch.await();
        Assert.assertEquals((int)result.size(), (int)admins.size());
        for (int i = 1; i < admins.size(); ++i) {
            Assert.assertEquals((Map)((Map)result.get(i - 1)), (Map)((Map)result.get(i)));
        }
        admins.forEach(a -> a.close());
        executor.shutdown();
    }

    @Test(timeOut=30000L)
    public void testTransferAdminApi() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/testUnloadAdminApi";
        this.createNonPartitionedTopicAndRetry(topicName);
        String broker = this.admin.lookups().lookupTopic(topicName);
        int index = this.extractBrokerIndex(broker);
        String bundleRange = this.admin.lookups().getBundleRange(topicName);
        try {
            this.admin.namespaces().unloadNamespaceBundle(DEFAULT_NAMESPACE, bundleRange, this.getBrokerUrl(index));
            Assert.fail();
        }
        catch (PulsarAdminException ex) {
            Assert.assertTrue((boolean)ex.getMessage().contains("cannot be transfer to same broker"));
        }
        int transferToIndex = this.generateRandomExcludingX(3, index);
        Assert.assertNotEquals((Object)transferToIndex, (Object)index);
        String transferTo = this.getBrokerUrl(transferToIndex);
        this.admin.namespaces().unloadNamespaceBundle(DEFAULT_NAMESPACE, bundleRange, transferTo);
        broker = this.admin.lookups().lookupTopic(topicName);
        index = this.extractBrokerIndex(broker);
        Assert.assertEquals((int)index, (int)transferToIndex);
    }

    @Test(timeOut=30000L)
    public void testSplitBundleAdminApi() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/testSplitBundleAdminApi";
        this.createNonPartitionedTopicAndRetry(topicName);
        String broker = this.admin.lookups().lookupTopic(topicName);
        log.info("The topic: {} owned by {}", (Object)topicName, (Object)broker);
        BundlesData bundles = this.admin.namespaces().getBundles(DEFAULT_NAMESPACE);
        int numBundles = bundles.getNumBundles();
        List<Long> bundleRanges = bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
        String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1);
        this.admin.namespaces().splitNamespaceBundle(DEFAULT_NAMESPACE, firstBundle, true, null);
        long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2L;
        Awaitility.waitAtMost((long)10L, (TimeUnit)TimeUnit.SECONDS).pollDelay(100L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            BundlesData bundlesData = this.admin.namespaces().getBundles(DEFAULT_NAMESPACE);
            Assert.assertEquals((int)bundlesData.getNumBundles(), (int)(numBundles + 1));
            String lowBundle = String.format("0x%08x", bundleRanges.get(0));
            String midBundle = String.format("0x%08x", mid);
            String highBundle = String.format("0x%08x", bundleRanges.get(1));
            Assert.assertTrue((boolean)bundlesData.getBoundaries().contains(lowBundle));
            Assert.assertTrue((boolean)bundlesData.getBoundaries().contains(midBundle));
            Assert.assertTrue((boolean)bundlesData.getBoundaries().contains(highBundle));
        });
        try {
            this.admin.namespaces().splitNamespaceBundle(DEFAULT_NAMESPACE, "invalid", true, null);
            Assert.fail();
        }
        catch (PulsarAdminException ex) {
            Assert.assertTrue((boolean)ex.getMessage().contains("Invalid bundle range"));
        }
    }

    @Test(timeOut=30000L)
    public void testDeleteNamespace() throws Exception {
        String namespace = "my-tenant/test-delete-namespace";
        String topicName = "persistent://" + namespace + "/test-delete-namespace-topic";
        this.admin.namespaces().createNamespace(namespace);
        this.admin.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{this.clusterName}));
        Assert.assertTrue((boolean)this.admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
        this.admin.topics().createPartitionedTopic(topicName, 2);
        String broker = this.admin.lookups().lookupTopic(topicName);
        log.info("The topic: {} owned by: {}", (Object)topicName, (Object)broker);
        this.admin.namespaces().deleteNamespace(namespace, true);
        Assert.assertFalse((boolean)this.admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
    }

    @Test(timeOut=120000L)
    public void testStopBroker() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/test-stop-broker-topic";
        this.createNonPartitionedTopicAndRetry(topicName);
        String broker = this.admin.lookups().lookupTopic(topicName);
        log.info("The topic: {} owned by: {}", (Object)topicName, (Object)broker);
        int idx = this.extractBrokerIndex(broker);
        for (BrokerContainer container : this.pulsarCluster.getBrokers()) {
            String name = container.getHostName();
            if (!name.contains(String.valueOf(idx))) continue;
            container.stop();
        }
        Awaitility.waitAtMost((long)60L, (TimeUnit)TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            String broker1 = this.admin.lookups().lookupTopic(topicName);
            Assert.assertNotEquals((Object)broker1, (Object)broker);
        });
    }

    @Test(timeOut=80000L)
    public void testAntiaffinityPolicy() throws PulsarAdminException {
        String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
        String antiAffinityEnabledNameSpace = "my-tenant/my-ns-filter-anti-affinity-enabled";
        int numPartition = 20;
        List activeBrokers = this.admin.brokers().getActiveBrokers();
        Assert.assertEquals((int)activeBrokers.size(), (int)3);
        for (int i = 0; i < activeBrokers.size(); ++i) {
            String namespace = "my-tenant/my-ns-filter-anti-affinity-enabled-" + i;
            this.admin.namespaces().createNamespace(namespace, 10);
            this.admin.namespaces().setNamespaceAntiAffinityGroup(namespace, "my-anti-affinity-filter");
            this.admin.clusters().createFailureDomain(this.clusterName, "my-anti-affinity-filter", FailureDomain.builder().brokers(Set.of((String)activeBrokers.get(i))).build());
        }
        HashSet<String> result = new HashSet<String>();
        for (int i = 0; i < activeBrokers.size(); ++i) {
            String topic = "persistent://my-tenant/my-ns-filter-anti-affinity-enabled-" + i + "/topic";
            this.admin.topics().createPartitionedTopic(topic, 20);
            Map topicToBroker = this.admin.lookups().lookupPartitionedTopic(topic);
            Assert.assertEquals((int)topicToBroker.size(), (int)20);
            HashSet brokers = new HashSet(topicToBroker.values());
            Assert.assertEquals((int)brokers.size(), (int)1);
            result.add((String)brokers.iterator().next());
            log.info("Topic: {}, lookup result: {}", (Object)topic, brokers.iterator().next());
        }
        Assert.assertEquals((int)result.size(), (int)3);
    }

    @Test(timeOut=300000L)
    public void testIsolationPolicy() throws Exception {
        String name;
        String namespaceIsolationPolicyName = "my-isolation-policy";
        String isolationEnabledNameSpace = "my-tenant/my-isolation-policy-anti-affinity-enabled";
        HashMap<String, String> parameters1 = new HashMap<String, String>();
        parameters1.put("min_limit", "1");
        parameters1.put("usage_threshold", "100");
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            List activeBrokers = (List)this.admin.brokers().getActiveBrokersAsync().get(5L, TimeUnit.SECONDS);
            Assert.assertEquals((int)activeBrokers.size(), (int)3);
        });
        try {
            this.admin.namespaces().createNamespace("my-tenant/my-isolation-policy-anti-affinity-enabled");
        }
        catch (PulsarAdminException.ConflictException conflictException) {
            // empty catch block
        }
        try {
            this.admin.clusters().createNamespaceIsolationPolicy(this.clusterName, "my-isolation-policy", NamespaceIsolationData.builder().namespaces(List.of("my-tenant/my-isolation-policy-anti-affinity-enabled")).autoFailoverPolicy(AutoFailoverPolicyData.builder().policyType(AutoFailoverPolicyType.min_available).parameters(parameters1).build()).primary(List.of(this.getHostName(0))).secondary(List.of(this.getHostName(1))).build());
        }
        catch (PulsarAdminException.ConflictException conflictException) {
            // empty catch block
        }
        String topic = "persistent://my-tenant/my-isolation-policy-anti-affinity-enabled/topic";
        this.createNonPartitionedTopicAndRetry("persistent://my-tenant/my-isolation-policy-anti-affinity-enabled/topic");
        String broker = this.admin.lookups().lookupTopic("persistent://my-tenant/my-isolation-policy-anti-affinity-enabled/topic");
        Assert.assertEquals((int)this.extractBrokerIndex(broker), (int)0);
        for (BrokerContainer container : this.pulsarCluster.getBrokers()) {
            name = container.getHostName();
            if (!name.contains("0")) continue;
            container.stop();
        }
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            List activeBrokers = (List)this.admin.brokers().getActiveBrokersAsync().get(5L, TimeUnit.SECONDS);
            Assert.assertEquals((int)activeBrokers.size(), (int)2);
        });
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            String ownerBroker = (String)this.admin.lookups().lookupTopicAsync("persistent://my-tenant/my-isolation-policy-anti-affinity-enabled/topic").get(5L, TimeUnit.SECONDS);
            Assert.assertEquals((int)this.extractBrokerIndex(ownerBroker), (int)1);
        });
        for (BrokerContainer container : this.pulsarCluster.getBrokers()) {
            name = container.getHostName();
            if (!name.contains("1")) continue;
            container.stop();
        }
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            List activeBrokers = (List)this.admin.brokers().getActiveBrokersAsync().get(5L, TimeUnit.SECONDS);
            Assert.assertEquals((int)activeBrokers.size(), (int)1);
        });
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
            try {
                this.admin.lookups().lookupTopicAsync("persistent://my-tenant/my-isolation-policy-anti-affinity-enabled/topic").get(5L, TimeUnit.SECONDS);
                Assert.fail();
            }
            catch (Exception ex) {
                log.error("Failed to lookup topic: ", (Throwable)ex);
                Assertions.assertThat((String)ex.getMessage()).contains(new CharSequence[]{"Service Unavailable"});
            }
        });
    }

    private void createNonPartitionedTopicAndRetry(String topicName) throws Exception {
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            try {
                this.admin.topics().createNonPartitionedTopic(topicName);
                return true;
            }
            catch (PulsarAdminException.ConflictException e) {
                return true;
            }
            catch (Exception e) {
                log.error("Failed to create topic: ", (Throwable)e);
                return false;
            }
        });
    }

    private String getBrokerUrl(int index) {
        return String.format("pulsar-broker-%d:%d", index, 8080);
    }

    private String getHostName(int index) {
        return String.format("pulsar-broker-%d", index);
    }

    private int extractBrokerIndex(String broker) {
        String pattern = "pulsar://.*-(\\d+):\\d+";
        Pattern compiledPattern = Pattern.compile(pattern);
        Matcher matcher = compiledPattern.matcher(broker);
        if (!matcher.find()) {
            throw new IllegalArgumentException("Failed to extract broker index");
        }
        return Integer.parseInt(matcher.group(1));
    }

    private int generateRandomExcludingX(int n, int x) {
        int randomNumber;
        Random random = new Random();
        while ((randomNumber = random.nextInt(n)) == x) {
        }
        return randomNumber;
    }
}

