package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/TopicsConsumerImplTest.class */
public class TopicsConsumerImplTest extends ProducerConsumerBase {
    private static final long testTimeout = 90000;
    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = testTimeout)
    public void testDifferentTopicsNameSubscribe() throws Exception {
        String str = "my-ex-subscription-TopicsFromDifferentNamespace";
        String str2 = "persistent://prop/use/ns-abc2/topic-2-TopicsFromDifferentNamespace";
        String str3 = "persistent://prop/use/ns-abc3/topic-3-TopicsFromDifferentNamespace";
        ArrayList newArrayList = Lists.newArrayList(new String[]{"persistent://prop/use/ns-abc1/topic-1-TopicsFromDifferentNamespace", str2, str3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str2, 2);
        this.admin.topics().createPartitionedTopic(str3, 3);
        try {
            this.pulsarClient.newConsumer().topics(newArrayList).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe for topics from different namespace should fail.");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test(timeOut = testTimeout)
    public void testGetConsumersAndGetTopics() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-2-TopicsConsumerGet";
        String str2 = "persistent://prop/use/ns-abc/topic-3-TopicsConsumerGet";
        ArrayList newArrayList = Lists.newArrayList(new String[]{"persistent://prop/use/ns-abc/topic-1-TopicsConsumerGet", str});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topics(newArrayList).topic(new String[]{str2}).subscriptionName("my-ex-subscription-TopicsConsumerGet").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof MultiTopicsConsumerImpl);
        List partitionedTopics = subscribe.getPartitionedTopics();
        List consumers = subscribe.getConsumers();
        partitionedTopics.forEach(str3 -> {
            log.info("topic: {}", str3);
        });
        consumers.forEach(consumerImpl -> {
            log.info("consumer: {}", consumerImpl.getTopic());
        });
        IntStream.range(0, 6).forEach(i -> {
            Assert.assertTrue(((String) partitionedTopics.get(i)).equals(((ConsumerImpl) consumers.get(i)).getTopic()));
        });
        Assert.assertTrue(subscribe.getTopics().size() == 3);
        subscribe.unsubscribe();
        subscribe.close();
    }

    @Test(timeOut = testTimeout)
    public void testSyncProducerAndConsumer() throws Exception {
        String str = "my-message-TopicsConsumerSyncTest-";
        String str2 = "persistent://prop/use/ns-abc/topic-1-TopicsConsumerSyncTest";
        String str3 = "persistent://prop/use/ns-abc/topic-2-TopicsConsumerSyncTest";
        String str4 = "persistent://prop/use/ns-abc/topic-3-TopicsConsumerSyncTest";
        ArrayList newArrayList = Lists.newArrayList(new String[]{str2, str3, str4});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str3, 2);
        this.admin.topics().createPartitionedTopic(str4, 3);
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topics(newArrayList).subscriptionName("my-ex-subscription-TopicsConsumerSyncTest").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof MultiTopicsConsumerImpl);
        for (int i = 0; i < 10; i++) {
            create.send((str + "producer1-" + i).getBytes());
            create2.send((str + "producer2-" + i).getBytes());
            create3.send((str + "producer3-" + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i2++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }

    @Test(timeOut = testTimeout)
    public void testAsyncConsumer() throws Exception {
        String str = "my-message-TopicsConsumerAsyncTest-";
        String str2 = "persistent://prop/use/ns-abc/topic-1-TopicsConsumerAsyncTest";
        String str3 = "persistent://prop/use/ns-abc/topic-2-TopicsConsumerAsyncTest";
        String str4 = "persistent://prop/use/ns-abc/topic-3-TopicsConsumerAsyncTest";
        ArrayList newArrayList = Lists.newArrayList(new String[]{str2, str3, str4});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str3, 2);
        this.admin.topics().createPartitionedTopic(str4, 3);
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topics(newArrayList).subscriptionName("my-ex-subscription-TopicsConsumerAsyncTest").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof MultiTopicsConsumerImpl);
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList2.add(create.sendAsync((str + "producer1-" + i).getBytes()));
            newArrayList2.add(create2.sendAsync((str + "producer2-" + i).getBytes()));
            newArrayList2.add(create3.sendAsync((str + "producer3-" + i).getBytes()));
        }
        log.info("Waiting for async publish to complete : {}", Integer.valueOf(newArrayList2.size()));
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        log.info("start async consume");
        CountDownLatch countDownLatch = new CountDownLatch(30);
        Executors.newFixedThreadPool(1).execute(() -> {
            IntStream.range(0, 30).forEach(i2 -> {
                subscribe.receiveAsync().thenAccept(message -> {
                    Assert.assertTrue(message instanceof TopicMessageImpl);
                    try {
                        subscribe.acknowledge(message);
                    } catch (PulsarClientException e) {
                        Assert.fail("message acknowledge failed", e);
                    }
                    countDownLatch.countDown();
                    log.info("receive index: {}, latch countDown: {}", Integer.valueOf(i2), Long.valueOf(countDownLatch.getCount()));
                }).exceptionally(th -> {
                    log.warn("receive index: {}, failed receive message {}", Integer.valueOf(i2), th.getMessage());
                    th.printStackTrace();
                    return null;
                });
            });
        });
        countDownLatch.await();
        log.info("success latch wait");
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }

    @Test(timeOut = testTimeout)
    public void testConsumerUnackedRedelivery() throws Exception {
        String str = "my-message-TopicsConsumerRedeliveryTest-";
        String str2 = "persistent://prop/use/ns-abc/topic-1-TopicsConsumerRedeliveryTest";
        String str3 = "persistent://prop/use/ns-abc/topic-2-TopicsConsumerRedeliveryTest";
        String str4 = "persistent://prop/use/ns-abc/topic-3-TopicsConsumerRedeliveryTest";
        ArrayList newArrayList = Lists.newArrayList(new String[]{str2, str3, str4});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str3, 2);
        this.admin.topics().createPartitionedTopic(str4, 3);
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topics(newArrayList).subscriptionName("my-ex-subscription-TopicsConsumerRedeliveryTest").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof MultiTopicsConsumerImpl);
        for (int i = 0; i < 10; i++) {
            create.send((str + "producer1-" + i).getBytes());
            create2.send((str + "producer2-" + i).getBytes());
            create3.send((str + "producer3-" + i).getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            Assert.assertTrue(message instanceof TopicMessageImpl);
            log.debug("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = subscribe.getUnAckedMessageTracker().size();
        log.debug("TopicsConsumerRedeliveryTest Unacked Message Tracker size is " + size);
        Assert.assertEquals(size, 30L);
        Message receive2 = subscribe.receive();
        HashSet hashSet = new HashSet();
        do {
            Assert.assertTrue(receive2 instanceof TopicMessageImpl);
            hashSet.add(new String(receive2.getData()));
            subscribe.acknowledge(receive2);
            log.debug("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        long size2 = subscribe.getUnAckedMessageTracker().size();
        log.debug("TopicsConsumerRedeliveryTest Unacked Message Tracker size is " + size2);
        Assert.assertEquals(size2, 0L);
        Assert.assertEquals(hashSet.size(), 30);
        for (int i2 = 0; i2 < 10; i2++) {
            create.send((str + "producer1-round2" + i2).getBytes());
            create2.send((str + "producer2-round2" + i2).getBytes());
            create3.send((str + "producer3-round2" + i2).getBytes());
        }
        int i3 = 0;
        for (Message receive3 = subscribe.receive(); receive3 != null; receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            Assert.assertTrue(receive3 instanceof TopicMessageImpl);
            i3++;
            log.debug("Consumer received : " + new String(receive3.getData()));
            subscribe.acknowledge(receive3);
        }
        long size3 = subscribe.getUnAckedMessageTracker().size();
        log.debug("TopicsConsumerRedeliveryTest Unacked Message Tracker size is " + size3);
        Assert.assertEquals(size3, 0L);
        Assert.assertEquals(i3, 30);
        subscribe.getUnAckedMessageTracker().toggle();
        subscribe.getConsumers().forEach(consumerImpl -> {
            consumerImpl.getUnAckedMessageTracker().toggle();
        });
        for (int i4 = 0; i4 < 10; i4++) {
            create.send((str + "producer1-round3" + i4).getBytes());
            create2.send((str + "producer2-round3" + i4).getBytes());
            create3.send((str + "producer3-round3" + i4).getBytes());
        }
        Message receive4 = subscribe.receive();
        while (true) {
            Message message2 = receive4;
            if (message2 == null) {
                break;
            }
            log.debug("Consumer received : " + new String(message2.getData()));
            receive4 = subscribe.receive(100, TimeUnit.MILLISECONDS);
        }
        long size4 = subscribe.getUnAckedMessageTracker().size();
        log.debug("TopicsConsumerRedeliveryTest Unacked Message Tracker size is " + size4);
        Assert.assertEquals(size4, 30L);
        Thread.sleep(this.ackTimeOutMillis);
        int i5 = 0;
        for (Message receive5 = subscribe.receive(); receive5 != null; receive5 = subscribe.receive(100, TimeUnit.MILLISECONDS)) {
            Assert.assertTrue(receive5 instanceof TopicMessageImpl);
            i5++;
            log.debug("Consumer received : " + new String(receive5.getData()));
            subscribe.acknowledge(receive5);
        }
        Assert.assertEquals(i5, 30);
        long size5 = subscribe.getUnAckedMessageTracker().size();
        log.info("TopicsConsumerRedeliveryTest Unacked Message Tracker size is " + size5);
        Assert.assertEquals(size5, 0L);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }

    @Test
    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
        String str = "my-message-TopicsConsumerSubscribeUnsubscribeSingleTopicTest-";
        String str2 = "persistent://prop/use/ns-abc/topic-1-TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
        String str3 = "persistent://prop/use/ns-abc/topic-2-TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
        String str4 = "persistent://prop/use/ns-abc/topic-3-TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
        ArrayList newArrayList = Lists.newArrayList(new String[]{str2, str3, str4});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str3, 2);
        this.admin.topics().createPartitionedTopic(str4, 3);
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topics(newArrayList).subscriptionName("my-ex-subscription-TopicsConsumerSubscribeUnsubscribeSingleTopicTest").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof MultiTopicsConsumerImpl);
        for (int i = 0; i < 10; i++) {
            create.send((str + "producer1-" + i).getBytes());
            create2.send((str + "producer2-" + i).getBytes());
            create3.send((str + "producer3-" + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i2++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        subscribe.unsubscribeAsync(str4).get();
        for (int i3 = 0; i3 < 10; i3++) {
            create.send((str + "producer1-round2" + i3).getBytes());
            create2.send((str + "producer2-round2" + i3).getBytes());
            create3.send((str + "producer3-round2" + i3).getBytes());
        }
        int i4 = 0;
        Message receive2 = subscribe.receive();
        do {
            Assert.assertTrue(receive2 instanceof TopicMessageImpl);
            i4++;
            subscribe.acknowledge(receive2);
            log.debug("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        Assert.assertEquals(i4, 20);
        List partitionedTopics = subscribe.getPartitionedTopics();
        List consumers = subscribe.getConsumers();
        Assert.assertEquals(partitionedTopics.size(), 3);
        Assert.assertEquals(consumers.size(), 3);
        Assert.assertTrue(subscribe.getTopics().size() == 2);
        subscribe.subscribeAsync(str4).get();
        for (int i5 = 0; i5 < 10; i5++) {
            create.send((str + "producer1-round3" + i5).getBytes());
            create2.send((str + "producer2-round3" + i5).getBytes());
            create3.send((str + "producer3-round3" + i5).getBytes());
        }
        int i6 = 0;
        Message receive3 = subscribe.receive();
        do {
            Assert.assertTrue(receive3 instanceof TopicMessageImpl);
            i6++;
            subscribe.acknowledge(receive3);
            log.debug("Consumer acknowledged : " + new String(receive3.getData()));
            receive3 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive3 != null);
        Assert.assertEquals(i6, 30);
        List partitionedTopics2 = subscribe.getPartitionedTopics();
        List consumers2 = subscribe.getConsumers();
        Assert.assertEquals(partitionedTopics2.size(), 6);
        Assert.assertEquals(consumers2.size(), 6);
        Assert.assertTrue(subscribe.getTopics().size() == 3);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }

    @Test(timeOut = testTimeout)
    public void testTopicsNameSubscribeWithBuilderFail() throws Exception {
        String str = "my-ex-subscription-TopicsNameSubscribeWithBuilder";
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/topic-2-TopicsNameSubscribeWithBuilder", 2);
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/topic-3-TopicsNameSubscribeWithBuilder", 3);
        try {
            this.pulsarClient.newConsumer().subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe1 with no topicName should fail.");
        } catch (PulsarClientException e) {
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[0]).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe2 with no topicName should fail.");
        } catch (IllegalArgumentException e2) {
        }
        try {
            this.pulsarClient.newConsumer().topics((List) null).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe3 with no topicName should fail.");
        } catch (IllegalArgumentException e3) {
        }
        try {
            this.pulsarClient.newConsumer().topics(Lists.newArrayList()).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe4 with no topicName should fail.");
        } catch (IllegalArgumentException e4) {
        }
    }
}
