package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/PartitionedProducerConsumerTest.class */
public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
    private ExecutorService executor;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("PartitionedProducerConsumerTest"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.executor.shutdown();
    }

    @Test(timeOut = 30000)
    public void testRoundRobinProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString(), producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-partitioned-subscriber", consumerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            Assert.assertTrue(newHashSet.add(str), "Message " + str + " already received");
        }
        createProducer.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testPartitionedTopicNameWithSpecialCharacter() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic1! * ' ( ) ; : @ & = + $ , /\\ ? % # [ ]");
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        this.pulsarClient.createProducer(destinationName.toString(), producerConfiguration).close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testSinglePartitionProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic2");
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.SinglePartition);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString(), producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-partitioned-subscriber", consumerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        createProducer.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testKeyBasedProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic3");
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString());
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-partitioned-subscriber", consumerConfiguration);
        for (int i = 0; i < 5; i++) {
            createProducer.send(MessageBuilder.create().setContent(("my-message-" + i).getBytes()).setKey("dummykey1").build());
        }
        for (int i2 = 5; i2 < 10; i2++) {
            createProducer.send(MessageBuilder.create().setContent(("my-message-" + i2).getBytes()).setKey("dummykey2").build());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testKeyBasedOrder(newHashSet, str);
        }
        createProducer.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    private void testKeyBasedOrder(Set<String> set, String str) {
        int parseInt = Integer.parseInt(str.substring(str.lastIndexOf(45) + 1));
        if (parseInt != 0 && parseInt != 5) {
            Assert.assertTrue(set.contains("my-message-" + (parseInt - 1)), "Message my-message-" + (parseInt - 1) + " should come before my-message-" + parseInt);
        }
        Assert.assertTrue(set.add(str), "Received duplicate message " + str);
    }

    @Test(timeOut = 30000)
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic4");
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-subscriber-name", consumerConfiguration);
        try {
            subscribe.acknowledge(MessageBuilder.create().setContent("InvalidMessage".getBytes()).build());
        } catch (PulsarClientException.InvalidMessageException e) {
        }
        subscribe.close();
        try {
            subscribe.receive();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e2) {
        }
        try {
            subscribe.unsubscribe();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e3) {
        }
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString());
        createProducer.close();
        try {
            createProducer.send("message".getBytes());
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e4) {
        }
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
    }

    @Test(timeOut = 30000)
    public void testSillyUser() throws Exception {
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic5");
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        Producer producer = null;
        Consumer consumer = null;
        try {
            producerConfiguration.setMessageRouter((MessageRouter) null);
            Assert.fail("should fail");
        } catch (NullPointerException e) {
        }
        try {
            producerConfiguration.setMessageRoutingMode((ProducerConfiguration.MessageRoutingMode) null);
            Assert.fail("should fail");
        } catch (NullPointerException e2) {
        }
        try {
            producer = this.pulsarClient.createProducer(destinationName.toString(), (ProducerConfiguration) null);
            Assert.fail("should fail");
        } catch (PulsarClientException e3) {
            Assert.assertTrue(e3 instanceof PulsarClientException.InvalidConfigurationException);
        }
        new ConsumerConfiguration();
        try {
            consumer = this.pulsarClient.subscribe(destinationName.toString(), "my-subscriber-name", (ConsumerConfiguration) null);
            Assert.fail("Should fail");
        } catch (PulsarClientException e4) {
            Assert.assertTrue(e4 instanceof PulsarClientException.InvalidConfigurationException);
        }
        try {
            try {
                producer = this.pulsarClient.createProducer(destinationName.toString());
                consumer = this.pulsarClient.subscribe(destinationName.toString(), "my-sub");
                producer.send("message1".getBytes());
                producer.send("message2".getBytes());
                consumer.receive();
                consumer.acknowledgeCumulative(consumer.receive());
                Assert.fail("should fail since ack cumulative is not supported for partitioned topic");
                producer.close();
                consumer.unsubscribe();
                consumer.close();
            } catch (PulsarClientException e5) {
                Assert.assertTrue(e5 instanceof PulsarClientException.NotSupportedException);
                producer.close();
                consumer.unsubscribe();
                consumer.close();
            }
            this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        } catch (Throwable th) {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testDeletePartitionedTopic() throws Exception {
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic6");
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString());
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-sub");
        subscribe.unsubscribe();
        subscribe.close();
        createProducer.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        if (this.pulsarClient.createProducer(destinationName.toString()) instanceof PartitionedProducerImpl) {
            Assert.fail("should fail since partitioned topic was deleted");
        }
    }

    @Test(timeOut = 30000)
    public void testAsyncPartitionedProducerConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString(), producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-partitioned-subscriber", consumerConfiguration);
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            newHashSet.add(str);
            createProducer.send(str.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, this.executor);
        countDownLatch.await();
        Assert.assertEquals(newHashSet.size(), 100);
        newHashSet.removeAll(newHashSet2);
        Assert.assertTrue(newHashSet.isEmpty());
        createProducer.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        DestinationName destinationName = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(1);
        this.admin.persistentTopics().createPartitionedTopic(destinationName.toString(), 4);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        Producer createProducer = this.pulsarClient.createProducer(destinationName.toString(), producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe(destinationName.toString(), "my-partitioned-subscriber", consumerConfiguration);
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            newHashSet.add(str);
            createProducer.send(str.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, this.executor);
        countDownLatch.await();
        Assert.assertEquals(newHashSet.size(), 100);
        newHashSet.removeAll(newHashSet2);
        Assert.assertTrue(newHashSet.isEmpty());
        createProducer.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic(destinationName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testFairDistributionForPartitionConsumers() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(10);
        this.admin.persistentTopics().createPartitionedTopic("persistent://my-property/use/my-ns/my-topic", 2);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.RoundRobinPartition);
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic-partition-0", producerConfiguration);
        Producer createProducer2 = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic-partition-1", producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-partitioned-subscriber", consumerConfiguration);
        int i = 0;
        for (int i2 = 0; i2 < 9; i2++) {
            createProducer.send(("producer1-" + i2).getBytes());
        }
        Thread.sleep(1000L);
        for (int i3 = 0; i3 < 5; i3++) {
            createProducer2.send(("producer2-" + i3).getBytes());
        }
        for (int i4 = 0; i4 < 30; i4++) {
            i += new String(subscribe.receive().getData()).startsWith("producer2") ? 1 : 0;
            if (i4 >= 2) {
                createProducer.send("producer1".getBytes());
                Thread.sleep(100L);
            }
        }
        Assert.assertTrue(i >= 4);
        createProducer.close();
        createProducer2.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.persistentTopics().deletePartitionedTopic("persistent://my-property/use/my-ns/my-topic");
        log.info("-- Exiting {} test --", this.methodName);
    }

    private void receiveAsync(Consumer consumer, int i, int i2, CountDownLatch countDownLatch, Set<String> set, ExecutorService executorService) throws PulsarClientException {
        if (i2 < i) {
            consumer.receiveAsync().handle((message, th) -> {
                if (th != null) {
                    return null;
                }
                set.add(new String(message.getData()));
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    Assert.fail("message acknowledge failed", e);
                }
                executorService.execute(() -> {
                    try {
                        receiveAsync(consumer, i, i2 + 1, countDownLatch, set, executorService);
                    } catch (PulsarClientException e2) {
                        Assert.fail("message receive failed", e2);
                    }
                });
                countDownLatch.countDown();
                return null;
            });
        }
    }
}
