package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.class */
public class PersistentDispatcherFailoverConsumerTest {
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ServerCnx serverCnxWithOldVersion;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private MetadataStore store;
    private ChannelHandlerContext channelCtx;
    private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
    private ZooKeeper mockZk;
    protected PulsarService pulsar;
    final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
    final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class);

    @BeforeMethod
    public void setup() throws Exception {
        this.executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setClusterName("pulsar-cluster");
        this.pulsar = (PulsarService) BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarService.class, serviceConfiguration);
        this.store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getLocalMetadataStore();
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getConfigurationMetadataStore();
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        });
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsar)).getManagedLedgerFactory();
        });
        ((PulsarService) Mockito.doReturn(TransactionTestBase.createMockBookKeeper(this.executor)).when(this.pulsar)).getBookKeeperClient();
        this.eventLoopGroup = new NioEventLoopGroup();
        PulsarResources pulsarResources = new PulsarResources(this.store, this.store);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(pulsarResources).when(this.pulsar)).getPulsarResources();
        });
        this.brokerService = (BrokerService) BrokerTestUtil.spyWithClassAndConstructorArgs(BrokerService.class, this.pulsar, this.eventLoopGroup);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        });
        this.consumerChanges = new LinkedBlockingQueue<>();
        this.channelCtx = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doAnswer(invocationOnMock -> {
            ByteBuf byteBuf = (ByteBuf) invocationOnMock.getArgument(0);
            ByteBuf retainedSlice = byteBuf.retainedSlice(4, byteBuf.writerIndex() - 4);
            try {
                int readUnsignedInt = (int) retainedSlice.readUnsignedInt();
                retainedSlice.writerIndex();
                BaseCommand baseCommand = new BaseCommand();
                baseCommand.parseFrom(retainedSlice, readUnsignedInt);
                if (baseCommand.hasActiveConsumerChange()) {
                    this.consumerChanges.put(baseCommand.getActiveConsumerChange());
                }
                return null;
            } finally {
                retainedSlice.release();
            }
        }).when(this.channelCtx)).writeAndFlush(ArgumentMatchers.any(), (ChannelPromise) ArgumentMatchers.any());
        this.serverCnx = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnx)).clientAddress();
        Mockito.when(Integer.valueOf(this.serverCnx.getRemoteEndpointProtocolVersion())).thenReturn(Integer.valueOf(ProtocolVersion.v12.getValue()));
        Mockito.when(this.serverCnx.ctx()).thenReturn(this.channelCtx);
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, this.serverCnx)).when(this.serverCnx)).getCommandSender();
        this.serverCnxWithOldVersion = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnxWithOldVersion)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnxWithOldVersion)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnxWithOldVersion)).clientAddress();
        Mockito.when(Integer.valueOf(this.serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())).thenReturn(Integer.valueOf(ProtocolVersion.v11.getValue()));
        Mockito.when(this.serverCnxWithOldVersion.ctx()).thenReturn(this.channelCtx);
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, this.serverCnxWithOldVersion)).when(this.serverCnxWithOldVersion)).getCommandSender();
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        });
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitOwned((ServiceUnitId) ArgumentMatchers.any(NamespaceBundle.class));
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitActive((TopicName) ArgumentMatchers.any(TopicName.class));
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(namespaceService)).checkTopicOwnership((TopicName) ArgumentMatchers.any(TopicName.class));
        setupMLAsyncCallbackMocks();
    }

    @AfterMethod(alwaysRun = true)
    public void shutdown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.close();
            this.brokerService = null;
        }
        if (this.pulsar != null) {
            this.pulsar.close();
            this.pulsar = null;
        }
        this.executor.shutdown();
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully().get();
        }
        this.store.close();
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(this.ledgerMock)).getCursors();
        ((ManagedCursor) Mockito.doReturn("mockCursor").when(this.cursorMock)).getName();
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentDispatcherFailoverConsumerTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(ArgumentMatchers.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(ArgumentMatchers.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1L, 1L), (ByteBuf) null, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncAddEntry((byte[]) ArgumentMatchers.any(byte[].class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(PersistentDispatcherFailoverConsumerTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(ArgumentMatchers.matches(".*success.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.5
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.6
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(ArgumentMatchers.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
    }

    private void verifyActiveConsumerChange(CommandActiveConsumerChange commandActiveConsumerChange, long j, boolean z) {
        AssertJUnit.assertEquals(j, commandActiveConsumerChange.getConsumerId());
        AssertJUnit.assertEquals(z, commandActiveConsumerChange.isIsActive());
    }

    @Test(timeOut = 10000)
    public void testAddConsumerWhenClosed() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, CommandSubscribe.SubType.Failover, 0, persistentTopic, new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false));
        persistentDispatcherSingleActiveConsumer.close().get();
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer);
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).disconnect();
        AssertJUnit.assertEquals(0, persistentDispatcherSingleActiveConsumer.consumers.size());
    }

    @Test
    public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, CommandSubscribe.SubType.Failover, 0, persistentTopic, persistentSubscription);
        Assert.assertFalse(persistentDispatcherSingleActiveConsumer.isConsumerConnected());
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", true, this.serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L);
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer);
        List consumers = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(((Consumer) consumers.get(0)).consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(1, consumers.size());
        Assert.assertNull(this.consumerChanges.poll());
        ((ChannelHandlerContext) Mockito.verify(this.channelCtx, Mockito.times(0))).write(ArgumentMatchers.any());
        persistentDispatcherSingleActiveConsumer.addConsumer(new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 2L, 0, "Cons2", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        List consumers2 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(((Consumer) consumers2.get(0)).consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(2, consumers2.size());
        verifyActiveConsumerChange(this.consumerChanges.take(), 2L, false);
        ((ChannelHandlerContext) Mockito.verify(this.channelCtx, Mockito.times(1))).writeAndFlush(ArgumentMatchers.any(), (ChannelPromise) ArgumentMatchers.any());
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, CommandSubscribe.SubType.Failover, 4, persistentTopic, persistentSubscription);
        Assert.assertFalse(persistentDispatcherSingleActiveConsumer.isConsumerConnected());
        Consumer consumer = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer);
        List consumers = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(((Consumer) consumers.get(0)).consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(1, consumers.size());
        verifyActiveConsumerChange(this.consumerChanges.take(), 1L, true);
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer);
        List consumers2 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(((Consumer) consumers2.get(0)).consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(2, consumers2.size());
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        CommandActiveConsumerChange poll = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll);
        verifyActiveConsumerChange(poll, 1L, true);
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        Consumer consumer2 = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 2L, 0, "Cons2", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer2);
        List consumers3 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(3, consumers3.size());
        CommandActiveConsumerChange poll2 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll2);
        verifyActiveConsumerChange(poll2, 2L, false);
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        ((Consumer) Mockito.verify(consumer2, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        Consumer consumer3 = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 0L, 0, "Cons0", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer3);
        List consumers4 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer3.consumerName());
        AssertJUnit.assertEquals(4, consumers4.size());
        CommandActiveConsumerChange poll3 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll3);
        verifyActiveConsumerChange(poll3, 0L, true);
        CommandActiveConsumerChange poll4 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll4);
        verifyActiveConsumerChange(poll4, 1L, false);
        CommandActiveConsumerChange poll5 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll5);
        verifyActiveConsumerChange(poll5, 1L, false);
        CommandActiveConsumerChange poll6 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll6);
        verifyActiveConsumerChange(poll6, 2L, false);
        ((Consumer) Mockito.verify(consumer3, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer3));
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer3));
        ((Consumer) Mockito.verify(consumer2, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        ((Consumer) Mockito.verify(consumer2, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer3));
        persistentDispatcherSingleActiveConsumer.removeConsumer(consumer2);
        List consumers5 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(3, consumers5.size());
        Assert.assertNull(this.consumerChanges.poll());
        Assert.assertFalse(persistentDispatcherSingleActiveConsumer.canUnsubscribe(consumer3));
        persistentDispatcherSingleActiveConsumer.removeConsumer(consumer3);
        List consumers6 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(2, consumers6.size());
        CommandActiveConsumerChange poll7 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll7);
        verifyActiveConsumerChange(poll7, 1L, true);
        CommandActiveConsumerChange poll8 = this.consumerChanges.poll(10L, TimeUnit.SECONDS);
        Assert.assertNotNull(poll8);
        verifyActiveConsumerChange(poll8, 1L, true);
        String str = "";
        try {
            persistentDispatcherSingleActiveConsumer.removeConsumer(consumer3);
        } catch (Exception e) {
            str = e.getMessage();
        }
        AssertJUnit.assertEquals(str, "Consumer was not connected");
        persistentDispatcherSingleActiveConsumer.removeConsumer(consumer);
        List consumers7 = persistentDispatcherSingleActiveConsumer.getConsumers();
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        AssertJUnit.assertEquals(1, consumers7.size());
        Assert.assertNull(this.consumerChanges.poll());
        AssertJUnit.assertTrue(persistentDispatcherSingleActiveConsumer.canUnsubscribe(consumer));
    }

    @Test
    public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
        log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = new PersistentDispatcherSingleActiveConsumer(this.cursorMock, CommandSubscribe.SubType.Failover, -1, persistentTopic, persistentSubscription);
        Assert.assertFalse(persistentDispatcherSingleActiveConsumer.isConsumerConnected());
        Consumer consumer = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 1L, 1, "Cons1", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer);
        AssertJUnit.assertEquals(1, persistentDispatcherSingleActiveConsumer.getConsumers().size());
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        Consumer consumer2 = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 2L, 1, "Cons2", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer2);
        AssertJUnit.assertEquals(2, persistentDispatcherSingleActiveConsumer.getConsumers().size());
        verifyActiveConsumerChange(this.consumerChanges.take(), 2L, false);
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        ((Consumer) Mockito.verify(consumer2, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        Consumer consumer3 = (Consumer) Mockito.spy(new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 3L, 0, "Cons3", true, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L));
        persistentDispatcherSingleActiveConsumer.addConsumer(consumer3);
        AssertJUnit.assertEquals(3, persistentDispatcherSingleActiveConsumer.getConsumers().size());
        verifyActiveConsumerChange(this.consumerChanges.take(), 3L, false);
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer.consumerName());
        ((Consumer) Mockito.verify(consumer3, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer));
        persistentDispatcherSingleActiveConsumer.removeConsumer(consumer);
        AssertJUnit.assertEquals(2, persistentDispatcherSingleActiveConsumer.getConsumers().size());
        verifyActiveConsumerChange(this.consumerChanges.take(), 2L, true);
        AssertJUnit.assertSame(persistentDispatcherSingleActiveConsumer.getActiveConsumer().consumerName(), consumer2.consumerName());
        ((Consumer) Mockito.verify(consumer2, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer2));
        ((Consumer) Mockito.verify(consumer3, Mockito.times(1))).notifyActiveConsumerChange((Consumer) ArgumentMatchers.same(consumer2));
    }

    @Test
    public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = new PersistentDispatcherMultipleConsumers(persistentTopic, this.cursorMock, (Subscription) null);
        Consumer createConsumer = createConsumer(persistentTopic, 0, 2, false, 1);
        Consumer createConsumer2 = createConsumer(persistentTopic, 0, 2, false, 2);
        Consumer createConsumer3 = createConsumer(persistentTopic, 0, 2, false, 3);
        Consumer createConsumer4 = createConsumer(persistentTopic, 1, 2, false, 4);
        Consumer createConsumer5 = createConsumer(persistentTopic, 1, 1, false, 5);
        Consumer createConsumer6 = createConsumer(persistentTopic, 1, 2, false, 6);
        Consumer createConsumer7 = createConsumer(persistentTopic, 2, 1, false, 7);
        Consumer createConsumer8 = createConsumer(persistentTopic, 2, 1, false, 8);
        Consumer createConsumer9 = createConsumer(persistentTopic, 2, 1, false, 9);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer2);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer3);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer4);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer5);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer6);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer7);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer8);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer9);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer5);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer6);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer6);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer7);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer8);
        Consumer createConsumer10 = createConsumer(persistentTopic, 0, 2, false, 10);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer10);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer10);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer10);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer9);
    }

    @Test
    public void testFewBlockedConsumerSamePriority() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = new PersistentDispatcherMultipleConsumers(persistentTopic, this.cursorMock, (Subscription) null);
        Consumer createConsumer = createConsumer(persistentTopic, 0, 2, false, 1);
        Consumer createConsumer2 = createConsumer(persistentTopic, 0, 2, false, 2);
        Consumer createConsumer3 = createConsumer(persistentTopic, 0, 2, false, 3);
        Consumer createConsumer4 = createConsumer(persistentTopic, 0, 2, false, 4);
        Consumer createConsumer5 = createConsumer(persistentTopic, 0, 1, true, 5);
        Consumer createConsumer6 = createConsumer(persistentTopic, 0, 2, true, 6);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer2);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer3);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer4);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer5);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer6);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertNull(getNextConsumer(persistentDispatcherMultipleConsumers));
    }

    @Test
    public void testFewBlockedConsumerDifferentPriority() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = new PersistentDispatcherMultipleConsumers(persistentTopic, this.cursorMock, (Subscription) null);
        Consumer createConsumer = createConsumer(persistentTopic, 0, 2, false, 1);
        Consumer createConsumer2 = createConsumer(persistentTopic, 0, 2, false, 2);
        Consumer createConsumer3 = createConsumer(persistentTopic, 0, 2, false, 3);
        Consumer createConsumer4 = createConsumer(persistentTopic, 0, 2, false, 4);
        Consumer createConsumer5 = createConsumer(persistentTopic, 0, 1, true, 5);
        Consumer createConsumer6 = createConsumer(persistentTopic, 0, 2, true, 6);
        Consumer createConsumer7 = createConsumer(persistentTopic, 1, 2, false, 7);
        Consumer createConsumer8 = createConsumer(persistentTopic, 1, 10, true, 8);
        Consumer createConsumer9 = createConsumer(persistentTopic, 1, 2, false, 9);
        Consumer createConsumer10 = createConsumer(persistentTopic, 2, 2, false, 10);
        Consumer createConsumer11 = createConsumer(persistentTopic, 2, 10, true, 11);
        Consumer createConsumer12 = createConsumer(persistentTopic, 2, 2, false, 12);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer2);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer3);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer4);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer5);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer6);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer7);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer8);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer9);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer10);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer11);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer12);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer2);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer3);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer7);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer9);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer7);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer9);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer10);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer12);
        Consumer createConsumer13 = createConsumer(persistentTopic, 0, 2, false, 13);
        Consumer createConsumer14 = createConsumer(persistentTopic, 0, 2, true, 14);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer13);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer14);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer13);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer13);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer10);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer12);
        Assert.assertNull(getNextConsumer(persistentDispatcherMultipleConsumers));
    }

    @Test
    public void testFewBlockedConsumerDifferentPriority2() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://part-perf/global/perf.t1/ptopic", this.ledgerMock, this.brokerService);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = new PersistentDispatcherMultipleConsumers(persistentTopic, this.cursorMock, (Subscription) null);
        Consumer createConsumer = createConsumer(persistentTopic, 0, 2, true, 1);
        Consumer createConsumer2 = createConsumer(persistentTopic, 0, 2, true, 2);
        Consumer createConsumer3 = createConsumer(persistentTopic, 0, 2, true, 3);
        Consumer createConsumer4 = createConsumer(persistentTopic, 1, 2, false, 4);
        Consumer createConsumer5 = createConsumer(persistentTopic, 1, 1, false, 5);
        Consumer createConsumer6 = createConsumer(persistentTopic, 2, 1, false, 6);
        Consumer createConsumer7 = createConsumer(persistentTopic, 2, 2, true, 7);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer2);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer3);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer4);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer5);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer6);
        persistentDispatcherMultipleConsumers.addConsumer(createConsumer7);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer5);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer4);
        Assert.assertEquals(getNextConsumer(persistentDispatcherMultipleConsumers), createConsumer6);
        Assert.assertNull(getNextConsumer(persistentDispatcherMultipleConsumers));
    }

    private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers) throws Exception {
        Consumer nextConsumer = persistentDispatcherMultipleConsumers.getNextConsumer();
        if (nextConsumer == null) {
            return null;
        }
        Field declaredField = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
        declaredField.setAccessible(true);
        ((AtomicIntegerFieldUpdater) declaredField.get(nextConsumer)).decrementAndGet(nextConsumer);
        return nextConsumer;
    }

    private Consumer createConsumer(PersistentTopic persistentTopic, int i, int i2, boolean z, int i3) throws Exception {
        Consumer consumer = new Consumer(new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false), CommandSubscribe.SubType.Shared, "test-topic", i3, i, "" + i3, true, this.serverCnx, "appId", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest, -1L);
        try {
            consumer.flowPermits(i2);
        } catch (Exception e) {
        }
        Field declaredField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
        declaredField.setAccessible(true);
        declaredField.set(consumer, Boolean.valueOf(z));
        return consumer;
    }
}
