package org.apache.pulsar.client.impl;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.class */
public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase {
    private static final long testTimeout = 90000;
    private static final Logger log = LoggerFactory.getLogger(PerMessageUnAcknowledgedRedeliveryTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = testTimeout)
    public void testSharedAckedNormalTopic() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-testSharedAckedNormalTopic";
        String str2 = "my-message-testSharedAckedNormalTopic-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(50);
        consumerConfiguration.setAckTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        ConsumerImpl subscribe = this.pulsarClient.subscribe(str, "my-ex-subscription-testSharedAckedNormalTopic", consumerConfiguration);
        for (int i = 0; i < 5; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            createProducer.send(str3.getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            log.info("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = subscribe.getUnAckedMessageTracker().size();
        log.info("testSharedAckedNormalTopic Unacked Message Tracker size is " + size);
        Assert.assertEquals(size, 5L);
        for (int i2 = 0; i2 < 5; i2++) {
            String str4 = str2 + i2;
            log.info("Producer produced: " + str4);
            createProducer.send(str4.getBytes());
        }
        int i3 = 0;
        for (Message receive2 = subscribe.receive(); receive2 != null; receive2 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i3++;
            log.info("Consumer received : " + new String(receive2.getData()));
            subscribe.acknowledge(receive2);
        }
        long size2 = subscribe.getUnAckedMessageTracker().size();
        log.info("testSharedAckedNormalTopic Unacked Message Tracker size is " + size2);
        Assert.assertEquals(size2, 5L);
        Assert.assertEquals(i3, 5);
        subscribe.getUnAckedMessageTracker().toggle();
        for (int i4 = 0; i4 < 5; i4++) {
            String str5 = str2 + i4;
            log.info("Producer produced: " + str5);
            createProducer.send(str5.getBytes());
        }
        Message receive3 = subscribe.receive();
        while (true) {
            Message message2 = receive3;
            if (message2 == null) {
                break;
            }
            log.info("Consumer received : " + new String(message2.getData()));
            receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size3 = subscribe.getUnAckedMessageTracker().size();
        log.info("testSharedAckedNormalTopic Unacked Message Tracker size is " + size3);
        Assert.assertEquals(size3, 10L);
        Thread.sleep(this.ackTimeOutMillis);
        int i5 = 0;
        for (Message receive4 = subscribe.receive(); receive4 != null; receive4 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i5++;
            log.info("Consumer received : " + new String(receive4.getData()));
            subscribe.acknowledge(receive4);
        }
        Assert.assertEquals(i5, 5);
        long size4 = subscribe.getUnAckedMessageTracker().size();
        log.info("testSharedAckedNormalTopic Unacked Message Tracker size is " + size4);
        Assert.assertEquals(size4, 5L);
    }

    @Test(timeOut = testTimeout)
    public void testExclusiveAckedNormalTopic() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-testExclusiveAckedNormalTopic";
        String str2 = "my-message-testExclusiveAckedNormalTopic-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(50);
        consumerConfiguration.setAckTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        ConsumerImpl subscribe = this.pulsarClient.subscribe(str, "my-ex-subscription-testExclusiveAckedNormalTopic", consumerConfiguration);
        for (int i = 0; i < 5; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            createProducer.send(str3.getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            log.info("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = subscribe.getUnAckedMessageTracker().size();
        log.info("testExclusiveAckedNormalTopic Unacked Message Tracker size is " + size);
        Assert.assertEquals(size, 5L);
        for (int i2 = 0; i2 < 5; i2++) {
            String str4 = str2 + i2;
            log.info("Producer produced: " + str4);
            createProducer.send(str4.getBytes());
        }
        int i3 = 0;
        for (Message receive2 = subscribe.receive(); receive2 != null; receive2 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i3++;
            log.info("Consumer received : " + new String(receive2.getData()));
            subscribe.acknowledge(receive2);
        }
        long size2 = subscribe.getUnAckedMessageTracker().size();
        log.info("testExclusiveAckedNormalTopic Unacked Message Tracker size is " + size2);
        Assert.assertEquals(size2, 5L);
        Assert.assertEquals(i3, 5);
        subscribe.getUnAckedMessageTracker().toggle();
        for (int i4 = 0; i4 < 5; i4++) {
            String str5 = str2 + i4;
            log.info("Producer produced: " + str5);
            createProducer.send(str5.getBytes());
        }
        Message receive3 = subscribe.receive();
        while (true) {
            Message message2 = receive3;
            if (message2 == null) {
                break;
            }
            log.info("Consumer received : " + new String(message2.getData()));
            receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size3 = subscribe.getUnAckedMessageTracker().size();
        log.info("testExclusiveAckedNormalTopic Unacked Message Tracker size is " + size3);
        Assert.assertEquals(size3, 10L);
        Thread.sleep(this.ackTimeOutMillis);
        int i5 = 0;
        for (Message receive4 = subscribe.receive(); receive4 != null; receive4 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i5++;
            log.info("Consumer received : " + new String(receive4.getData()));
            subscribe.acknowledge(receive4);
        }
        Assert.assertEquals(i5, 10);
        long size4 = subscribe.getUnAckedMessageTracker().size();
        log.info("testExclusiveAckedNormalTopic Unacked Message Tracker size is " + size4);
        Assert.assertEquals(size4, 0L);
    }

    @Test(timeOut = testTimeout)
    public void testFailoverAckedNormalTopic() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-testFailoverAckedNormalTopic";
        String str2 = "my-message-testFailoverAckedNormalTopic-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(50);
        consumerConfiguration.setAckTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Failover);
        ConsumerImpl subscribe = this.pulsarClient.subscribe(str, "my-ex-subscription-testFailoverAckedNormalTopic", consumerConfiguration);
        for (int i = 0; i < 5; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            createProducer.send(str3.getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            log.info("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = subscribe.getUnAckedMessageTracker().size();
        log.info("testFailoverAckedNormalTopic Unacked Message Tracker size is " + size);
        Assert.assertEquals(size, 5L);
        for (int i2 = 0; i2 < 5; i2++) {
            String str4 = str2 + i2;
            log.info("Producer produced: " + str4);
            createProducer.send(str4.getBytes());
        }
        int i3 = 0;
        for (Message receive2 = subscribe.receive(); receive2 != null; receive2 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i3++;
            log.info("Consumer received : " + new String(receive2.getData()));
            subscribe.acknowledge(receive2);
        }
        long size2 = subscribe.getUnAckedMessageTracker().size();
        log.info("testFailoverAckedNormalTopic Unacked Message Tracker size is " + size2);
        Assert.assertEquals(size2, 5L);
        Assert.assertEquals(i3, 5);
        subscribe.getUnAckedMessageTracker().toggle();
        for (int i4 = 0; i4 < 5; i4++) {
            String str5 = str2 + i4;
            log.info("Producer produced: " + str5);
            createProducer.send(str5.getBytes());
        }
        Message receive3 = subscribe.receive();
        while (true) {
            Message message2 = receive3;
            if (message2 == null) {
                break;
            }
            log.info("Consumer received : " + new String(message2.getData()));
            receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size3 = subscribe.getUnAckedMessageTracker().size();
        log.info("testFailoverAckedNormalTopic Unacked Message Tracker size is " + size3);
        Assert.assertEquals(size3, 10L);
        Thread.sleep(this.ackTimeOutMillis);
        int i5 = 0;
        for (Message receive4 = subscribe.receive(); receive4 != null; receive4 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i5++;
            log.info("Consumer received : " + new String(receive4.getData()));
            subscribe.acknowledge(receive4);
        }
        Assert.assertEquals(i5, 10);
        long size4 = subscribe.getUnAckedMessageTracker().size();
        log.info("testFailoverAckedNormalTopic Unacked Message Tracker size is " + size4);
        Assert.assertEquals(size4, 0L);
    }

    private static long getUnackedMessagesCountInPartitionedConsumer(Consumer consumer) {
        return ((PartitionedConsumerImpl) consumer).getConsumers().stream().mapToLong(consumerImpl -> {
            return consumerImpl.getUnAckedMessageTracker().size();
        }).sum();
    }

    @Test(timeOut = testTimeout)
    public void testSharedAckedPartitionedTopic() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-testSharedAckedPartitionedTopic";
        String str2 = "my-message-testSharedAckedPartitionedTopic-";
        this.admin.persistentTopics().createPartitionedTopic(str, 3);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        Producer createProducer = this.pulsarClient.createProducer(str, producerConfiguration);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(50);
        consumerConfiguration.setAckTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        PartitionedConsumerImpl subscribe = this.pulsarClient.subscribe(str, "my-ex-subscription-testSharedAckedPartitionedTopic", consumerConfiguration);
        for (int i = 0; i < 5; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            createProducer.send(str3.getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            log.info("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long unackedMessagesCountInPartitionedConsumer = getUnackedMessagesCountInPartitionedConsumer(subscribe);
        log.info("testSharedAckedPartitionedTopic Unacked Message Tracker size is " + unackedMessagesCountInPartitionedConsumer);
        Assert.assertEquals(unackedMessagesCountInPartitionedConsumer, 5L);
        for (int i2 = 0; i2 < 5; i2++) {
            String str4 = str2 + i2;
            log.info("Producer produced: " + str4);
            createProducer.send(str4.getBytes());
        }
        int i3 = 0;
        for (Message receive2 = subscribe.receive(); receive2 != null; receive2 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i3++;
            log.info("Consumer received : " + new String(receive2.getData()));
            subscribe.acknowledge(receive2);
        }
        long unackedMessagesCountInPartitionedConsumer2 = getUnackedMessagesCountInPartitionedConsumer(subscribe);
        log.info("testSharedAckedPartitionedTopic Unacked Message Tracker size is " + unackedMessagesCountInPartitionedConsumer2);
        Assert.assertEquals(unackedMessagesCountInPartitionedConsumer2, 5L);
        Assert.assertEquals(i3, 5);
        subscribe.getConsumers().forEach(consumerImpl -> {
            consumerImpl.getUnAckedMessageTracker().toggle();
        });
        for (int i4 = 0; i4 < 5; i4++) {
            String str5 = str2 + i4;
            log.info("Producer produced: " + str5);
            createProducer.send(str5.getBytes());
        }
        Message receive3 = subscribe.receive();
        while (true) {
            Message message2 = receive3;
            if (message2 == null) {
                break;
            }
            log.info("Consumer received : " + new String(message2.getData()));
            receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long unackedMessagesCountInPartitionedConsumer3 = getUnackedMessagesCountInPartitionedConsumer(subscribe);
        log.info("testSharedAckedPartitionedTopic Unacked Message Tracker size is " + unackedMessagesCountInPartitionedConsumer3);
        Assert.assertEquals(unackedMessagesCountInPartitionedConsumer3, 10L);
        Thread.sleep(this.ackTimeOutMillis);
        int i5 = 0;
        for (Message receive4 = subscribe.receive(); receive4 != null; receive4 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            i5++;
            log.info("Consumer received : " + new String(receive4.getData()));
            subscribe.acknowledge(receive4);
        }
        Assert.assertEquals(i5, 5);
        long unackedMessagesCountInPartitionedConsumer4 = getUnackedMessagesCountInPartitionedConsumer(subscribe);
        log.info("testSharedAckedPartitionedTopic Unacked Message Tracker size is " + unackedMessagesCountInPartitionedConsumer4);
        Assert.assertEquals(unackedMessagesCountInPartitionedConsumer4, 5L);
    }
}
