package org.apache.pulsar.client.impl;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/ZeroQueueSizeTest.class */
public class ZeroQueueSizeTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(ZeroQueueSizeTest.class);
    private final int totalMessages = 10;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test
    public void validQueueSizeConfig() {
        try {
            new ConsumerConfiguration().setReceiverQueueSize(0);
        } catch (Exception e) {
            Assert.fail();
        }
    }

    @Test(expectedExceptions = {IllegalArgumentException.class})
    public void InvalidQueueSizeConfig() {
        new ConsumerConfiguration().setReceiverQueueSize(-1);
    }

    @Test(expectedExceptions = {PulsarClientException.InvalidConfigurationException.class})
    public void zeroQueueSizeReceieveAsyncInCompatibility() throws PulsarClientException {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(0);
        this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic-zeroQueueSizeReceieveAsyncInCompatibility", "my-ex-subscription-zeroQueueSizeReceieveAsyncInCompatibility", consumerConfiguration).receive(10, TimeUnit.SECONDS);
    }

    @Test(expectedExceptions = {PulsarClientException.class})
    public void zeroQueueSizePartitionedTopicInCompatibility() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizePartitionedTopicInCompatibility";
        this.admin.persistentTopics().createPartitionedTopic(str, 3);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(0);
        this.pulsarClient.subscribe(str, "my-ex-subscription-zeroQueueSizePartitionedTopicInCompatibility", consumerConfiguration);
    }

    @Test
    public void zeroQueueSizeNormalConsumer() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-nonZeroQueueSizeNormalConsumer";
        String str2 = "my-message-nonZeroQueueSizeNormalConsumer-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(0);
        ConsumerImpl subscribe = this.pulsarClient.subscribe(str, "my-ex-subscription-nonZeroQueueSizeNormalConsumer", consumerConfiguration);
        for (int i = 0; i < 10; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            createProducer.send(str3.getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str2 + i2);
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
    }

    @Test
    public void zeroQueueSizeSharedSubscription() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizeSharedSubscription";
        String str2 = "my-ex-subscription-zeroQueueSizeSharedSubscription";
        String str3 = "my-message-zeroQueueSizeSharedSubscription-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(0);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        ConsumerImpl[] consumerImplArr = new ConsumerImpl[4];
        for (int i = 0; i < 4; i++) {
            consumerImplArr[i] = (ConsumerImpl) this.pulsarClient.subscribe(str, str2, consumerConfiguration);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            createProducer.send((str3 + i2).getBytes());
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertEquals(consumerImplArr[i3 % 4].numMessagesInQueue(), 0);
            Message receive = consumerImplArr[i3 % 4].receive();
            Assert.assertEquals(new String(receive.getData()), str3 + i3);
            Assert.assertEquals(consumerImplArr[i3 % 4].numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
    }

    @Test
    public void zeroQueueSizeFailoverSubscription() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizeFailoverSubscription";
        String str2 = "my-ex-subscription-zeroQueueSizeFailoverSubscription";
        String str3 = "my-message-zeroQueueSizeFailoverSubscription-";
        Producer createProducer = this.pulsarClient.createProducer(str);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(0);
        consumerConfiguration.setSubscriptionType(SubscriptionType.Failover);
        consumerConfiguration.setConsumerName("consumer-1");
        ConsumerImpl subscribe = this.pulsarClient.subscribe(str, str2, consumerConfiguration);
        consumerConfiguration.setConsumerName("consumer-2");
        ConsumerImpl subscribe2 = this.pulsarClient.subscribe(str, str2, consumerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.send((str3 + i).getBytes());
        }
        for (int i2 = 0; i2 < 5; i2++) {
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str3 + i2);
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
        subscribe.redeliverUnacknowledgedMessages();
        subscribe.close();
        for (int i3 = 0; i3 < 5; i3++) {
            Assert.assertEquals(subscribe2.numMessagesInQueue(), 0);
            Message receive2 = subscribe2.receive();
            Assert.assertEquals(new String(receive2.getData()), str3 + i3);
            Assert.assertEquals(subscribe2.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive2.getData()));
        }
    }

    @Test
    public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        consumerConfiguration.setReceiverQueueSize(0);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop-xyz/use/ns-abc/topic1", "my-subscriber-name", consumerConfiguration);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        if (100 != 0) {
            producerConfiguration.setBatchingEnabled(true);
            producerConfiguration.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
            producerConfiguration.setBatchingMaxMessages(5);
        }
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop-xyz/use/ns-abc/topic1", producerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        try {
            subscribe.receiveAsync().handle((message, th) -> {
                if (th != null) {
                    return null;
                }
                Assert.fail();
                return null;
            });
            subscribe.close();
        } catch (Throwable th2) {
            subscribe.close();
            throw th2;
        }
    }
}
