package org.apache.pulsar.broker.service;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/MessageCumulativeAckTest.class */
public class MessageCumulativeAckTest {
    private final int consumerId = 1;
    private BrokerService brokerService;
    private ServerCnx serverCnx;
    private MetadataStore store;
    protected PulsarService pulsar;
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;
    private PersistentSubscription sub;

    @BeforeMethod
    public void setup() throws Exception {
        this.executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Double.valueOf(1.0d));
        serviceConfiguration.setClusterName("pulsar-cluster");
        this.pulsar = (PulsarService) BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarService.class, serviceConfiguration);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn((ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class)).when(this.pulsar)).getManagedLedgerFactory();
        ((PulsarService) Mockito.doReturn(TransactionTestBase.createMockBookKeeper(this.executor)).when(this.pulsar)).getBookKeeperClient();
        this.store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getLocalMetadataStore();
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getConfigurationMetadataStore();
        PulsarResources pulsarResources = new PulsarResources(this.store, this.store);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(pulsarResources).when(this.pulsar)).getPulsarResources();
        });
        this.serverCnx = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnx)).clientAddress();
        Mockito.when(Integer.valueOf(this.serverCnx.getRemoteEndpointProtocolVersion())).thenReturn(Integer.valueOf(ProtocolVersion.v12.getValue()));
        Mockito.when(this.serverCnx.ctx()).thenReturn((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class));
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, this.serverCnx)).when(this.serverCnx)).getCommandSender();
        this.eventLoopGroup = new NioEventLoopGroup();
        this.brokerService = (BrokerService) BrokerTestUtil.spyWithClassAndConstructorArgs(BrokerService.class, this.pulsar, this.eventLoopGroup);
        PulsarServiceMockSupport.mockPulsarServiceProps(this.pulsar, () -> {
            ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        });
        this.sub = (PersistentSubscription) Mockito.spy(new PersistentSubscription(new PersistentTopic(TopicName.get("MessageCumulativeAckTest").toString(), (ManagedLedger) Mockito.mock(ManagedLedger.class), this.brokerService), "sub-1", (ManagedCursor) Mockito.mock(ManagedCursorImpl.class), false));
        ((PersistentSubscription) Mockito.doNothing().when(this.sub)).acknowledgeMessage((List) ArgumentMatchers.any(), (CommandAck.AckType) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }

    @AfterMethod(alwaysRun = true)
    public void shutdown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.close();
            this.brokerService = null;
        }
        if (this.pulsar != null) {
            this.pulsar.close();
            this.pulsar = null;
        }
        this.executor.shutdown();
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully().get();
        }
        this.store.close();
        this.sub = null;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "individualAckModes")
    public static Object[][] individualAckModes() {
        return new Object[]{new Object[]{CommandSubscribe.SubType.Shared}, new Object[]{CommandSubscribe.SubType.Key_Shared}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "notIndividualAckModes")
    public static Object[][] notIndividualAckModes() {
        return new Object[]{new Object[]{CommandSubscribe.SubType.Exclusive}, new Object[]{CommandSubscribe.SubType.Failover}};
    }

    @Test(timeOut = 5000, dataProvider = "individualAckModes")
    public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
        Consumer consumer = new Consumer(this.sub, subType, "topic-1", 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription) Mockito.verify(this.sub, Mockito.never())).acknowledgeMessage((List) ArgumentMatchers.any(), (CommandAck.AckType) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }

    @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
    public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
        Consumer consumer = new Consumer(this.sub, subType, "topic-1", 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription) Mockito.verify(this.sub, Mockito.times(1))).acknowledgeMessage((List) ArgumentMatchers.any(), (CommandAck.AckType) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }

    @Test(timeOut = 5000)
    public void testAckWithMoreThanNoneMessageIds() throws Exception {
        Consumer consumer = new Consumer(this.sub, CommandSubscribe.SubType.Failover, "topic-1", 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription) Mockito.verify(this.sub, Mockito.never())).acknowledgeMessage((List) ArgumentMatchers.any(), (CommandAck.AckType) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }
}
