package org.apache.pulsar.broker.loadbalance;

import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.class */
public class LeaderElectionServiceTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LeaderElectionServiceTest.class);
    private LocalBookkeeperEnsemble bkEnsemble;
    private LeaderElectionService.LeaderListener listener;

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest$MockPulsarService.class */
    private static class MockPulsarService extends PulsarService {
        public MockPulsarService(ServiceConfiguration serviceConfiguration) {
            super(serviceConfiguration);
        }

        public MockPulsarService(ServiceConfiguration serviceConfiguration, Optional<WorkerService> optional, Consumer<Integer> consumer) {
            super(serviceConfiguration, optional, consumer);
        }

        protected void startLeaderElectionService() {
        }
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        log.info("---- bk started ----");
        this.listener = new LeaderElectionService.LeaderListener() { // from class: org.apache.pulsar.broker.loadbalance.LeaderElectionServiceTest.1
            public void brokerIsTheLeaderNow() {
                LeaderElectionServiceTest.log.info("i am a leader");
            }

            public void brokerIsAFollowerNow() {
                LeaderElectionServiceTest.log.info("i am a follower");
            }
        };
    }

    @AfterMethod
    void shutdown() throws Exception {
        this.bkEnsemble.stop();
        log.info("---- bk stopped ----");
    }

    @Test
    public void electedShouldBeTrue() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        ZooKeeperCache zooKeeperCache = (ZooKeeperCache) Mockito.mock(ZooKeeperCache.class);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        Mockito.when(pulsarService.getZkClient()).thenReturn(this.bkEnsemble.getZkClient());
        Mockito.when(pulsarService.getExecutor()).thenReturn(newSingleThreadScheduledExecutor);
        Mockito.when(pulsarService.getSafeWebServiceAddress()).thenReturn("http://localhost:8080");
        Mockito.when(zooKeeperCache.getZooKeeper()).thenReturn(this.bkEnsemble.getZkClient());
        Mockito.when(pulsarService.getLocalZkCache()).thenReturn(zooKeeperCache);
        LeaderElectionService leaderElectionService = new LeaderElectionService(pulsarService, this.listener);
        leaderElectionService.start();
        Assert.assertTrue(leaderElectionService.isElected());
        Assert.assertTrue(leaderElectionService.isLeader());
        Assert.assertEquals(leaderElectionService.getCurrentLeader().getServiceUrl(), "http://localhost:8080");
        log.info("leader state {} {} {}", Boolean.valueOf(leaderElectionService.isElected()), Boolean.valueOf(leaderElectionService.isLeader()), leaderElectionService.getCurrentLeader().getServiceUrl());
        LeaderElectionService leaderElectionService2 = new LeaderElectionService(pulsarService, this.listener);
        leaderElectionService2.start();
        Assert.assertTrue(leaderElectionService2.isElected());
        Assert.assertFalse(leaderElectionService2.isLeader());
        Assert.assertEquals(leaderElectionService2.getCurrentLeader().getServiceUrl(), "http://localhost:8080");
        log.info("follower state {} {} {}", Boolean.valueOf(leaderElectionService2.isElected()), Boolean.valueOf(leaderElectionService2.isLeader()), leaderElectionService2.getCurrentLeader().getServiceUrl());
    }

    @Test
    public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerException, PulsarClientException, PulsarAdminException {
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerServicePort(Optional.of(6650));
        serviceConfiguration.setWebServicePort(Optional.of(8080));
        serviceConfiguration.setClusterName("elect-test");
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        PulsarService pulsarService = (PulsarService) Mockito.spy(new MockPulsarService(serviceConfiguration));
        pulsarService.start();
        ((PulsarService) Mockito.doReturn((Object) null).when(pulsarService)).getLeaderElectionService();
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
        build.clusters().createCluster("elect-test", new ClusterData("http://localhost:8080"));
        build.tenants().createTenant("elect", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("elect-test")));
        build.namespaces().createNamespace("elect/ns", 16);
        PulsarClient build2 = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").startingBackoffInterval(1L, TimeUnit.MILLISECONDS).maxBackoffInterval(100L, TimeUnit.MILLISECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
        checkLookupException("elect", "ns", build2);
        LeaderElectionService leaderElectionService = (LeaderElectionService) Mockito.mock(LeaderElectionService.class);
        Mockito.when(Boolean.valueOf(leaderElectionService.isElected())).thenReturn(false);
        ((PulsarService) Mockito.doReturn(leaderElectionService).when(pulsarService)).getLeaderElectionService();
        checkLookupException("elect", "ns", build2);
        Mockito.when(Boolean.valueOf(leaderElectionService.isElected())).thenReturn(true);
        Mockito.when(Boolean.valueOf(leaderElectionService.isLeader())).thenReturn(true);
        Mockito.when(leaderElectionService.getCurrentLeader()).thenReturn(new LeaderBroker("http://localhost:8080"));
        build2.newProducer().topic("persistent://elect/ns/1p").create().getTopic();
    }

    private void checkLookupException(String str, String str2, PulsarClient pulsarClient) {
        try {
            pulsarClient.newProducer().topic("persistent://" + str + "/" + str2 + "/1p").create();
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.LookupException);
            Assert.assertEquals(e.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!");
        }
    }
}
