package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.class */
public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
    private static final long testTimeout = 90000;
    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.isTcpLookup = true;
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = testTimeout)
    public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
        String str = "my-ex-subscription-PatternTopicsSubscribeWithBuilderFail";
        String str2 = "persistent://my-property/my-ns/topic-1-PatternTopicsSubscribeWithBuilderFail";
        String str3 = "persistent://my-property/my-ns/topic-2-PatternTopicsSubscribeWithBuilderFail";
        String str4 = "persistent://my-property/my-ns/topic-3-PatternTopicsSubscribeWithBuilderFail";
        ArrayList newArrayList = Lists.newArrayList(new String[]{str2, str3, str4, "non-persistent://my-property/my-ns/topic-4-PatternTopicsSubscribeWithBuilderFail"});
        Pattern compile = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str3, 2);
        this.admin.topics().createPartitionedTopic(str4, 3);
        try {
            this.pulsarClient.newConsumer().topicsPattern(compile).topic(new String[]{str2}).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe1 with pattern and topic should fail.");
        } catch (PulsarClientException e) {
        }
        try {
            this.pulsarClient.newConsumer().topicsPattern(compile).topics(newArrayList).subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe2 with pattern and topics should fail.");
        } catch (PulsarClientException e2) {
        }
        try {
            this.pulsarClient.newConsumer().topicsPattern(compile).topicsPattern("persistent://my-property/my-ns/pattern-topic.*").subscriptionName(str).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail("subscribe3 with pattern and patternString should fail.");
        } catch (IllegalArgumentException e3) {
        }
    }

    @Test(timeOut = testTimeout)
    public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
        String str = "persistent://my-property/my-ns/pattern-topic-2-BinaryProtoToGetTopics";
        String str2 = "persistent://my-property/my-ns/pattern-topic-3-BinaryProtoToGetTopics";
        Pattern compile = Pattern.compile("my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        String str3 = "my-message-BinaryProtoToGetTopics-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pattern-topic-1-BinaryProtoToGetTopics").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create4 = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/pattern-topic-4-BinaryProtoToGetTopics").enableBatching(false).create();
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-BinaryProtoToGetTopics").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertSame(compile, subscribe.getPattern());
        List partitionedTopics = subscribe.getPartitionedTopics();
        List consumers = subscribe.getConsumers();
        Assert.assertEquals(partitionedTopics.size(), 6);
        Assert.assertEquals(consumers.size(), 6);
        Assert.assertEquals(subscribe.getTopics().size(), 3);
        partitionedTopics.forEach(str4 -> {
            log.debug("topic: {}", str4);
        });
        consumers.forEach(consumerImpl -> {
            log.debug("consumer: {}", consumerImpl.getTopic());
        });
        IntStream.range(0, partitionedTopics.size()).forEach(i -> {
            Assert.assertTrue(((String) partitionedTopics.get(i)).equals(((ConsumerImpl) consumers.get(i)).getTopic()));
        });
        subscribe.getTopics().forEach(str5 -> {
            log.debug("getTopics topic: {}", str5);
        });
        for (int i2 = 0; i2 < 30 / 3; i2++) {
            create.send((str3 + "producer1-" + i2).getBytes());
            create2.send((str3 + "producer2-" + i2).getBytes());
            create3.send((str3 + "producer3-" + i2).getBytes());
            create4.send((str3 + "producer4-" + i2).getBytes());
        }
        int i3 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i3++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i3, 30);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
        create4.close();
    }

    @Test(timeOut = testTimeout)
    public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception {
        String str = "persistent://my-property/my-ns/np-pattern-topic-2-BinaryProtoToGetTopics";
        String str2 = "persistent://my-property/my-ns/np-pattern-topic-3-BinaryProtoToGetTopics";
        Pattern compile = Pattern.compile("my-property/my-ns/np-pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        String str3 = "my-message-BinaryProtoToGetTopics-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/np-pattern-topic-1-BinaryProtoToGetTopics").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create4 = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/np-pattern-topic-4-BinaryProtoToGetTopics").enableBatching(false).create();
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-BinaryProtoToGetTopics").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT).subscribe();
        Assert.assertSame(compile, subscribe.getPattern());
        List partitionedTopics = subscribe.getPartitionedTopics();
        List consumers = subscribe.getConsumers();
        Assert.assertEquals(partitionedTopics.size(), 1);
        Assert.assertEquals(consumers.size(), 1);
        Assert.assertEquals(subscribe.getTopics().size(), 1);
        partitionedTopics.forEach(str4 -> {
            log.debug("topic: {}", str4);
        });
        consumers.forEach(consumerImpl -> {
            log.debug("consumer: {}", consumerImpl.getTopic());
        });
        IntStream.range(0, partitionedTopics.size()).forEach(i -> {
            Assert.assertTrue(((String) partitionedTopics.get(i)).equals(((ConsumerImpl) consumers.get(i)).getTopic()));
        });
        subscribe.getTopics().forEach(str5 -> {
            log.debug("getTopics topic: {}", str5);
        });
        for (int i2 = 0; i2 < 40 / 4; i2++) {
            create.send((str3 + "producer1-" + i2).getBytes());
            create2.send((str3 + "producer2-" + i2).getBytes());
            create3.send((str3 + "producer3-" + i2).getBytes());
            create4.send((str3 + "producer4-" + i2).getBytes());
        }
        int i3 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i3++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i3, 40 / 4);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
        create4.close();
    }

    @Test(timeOut = testTimeout)
    public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
        String str = "persistent://my-property/my-ns/pattern-topic-2-BinaryProtoToGetTopics";
        String str2 = "persistent://my-property/my-ns/pattern-topic-3-BinaryProtoToGetTopics";
        Pattern compile = Pattern.compile("my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        String str3 = "my-message-BinaryProtoToGetTopics-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pattern-topic-1-BinaryProtoToGetTopics").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create4 = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/pattern-topic-4-BinaryProtoToGetTopics").enableBatching(false).create();
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-BinaryProtoToGetTopics").subscriptionType(SubscriptionType.Shared).subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        Assert.assertSame(compile, subscribe.getPattern());
        List partitionedTopics = subscribe.getPartitionedTopics();
        List consumers = subscribe.getConsumers();
        Assert.assertEquals(partitionedTopics.size(), 7);
        Assert.assertEquals(consumers.size(), 7);
        Assert.assertEquals(subscribe.getTopics().size(), 4);
        partitionedTopics.forEach(str4 -> {
            log.debug("topic: {}", str4);
        });
        consumers.forEach(consumerImpl -> {
            log.debug("consumer: {}", consumerImpl.getTopic());
        });
        IntStream.range(0, partitionedTopics.size()).forEach(i -> {
            Assert.assertTrue(((String) partitionedTopics.get(i)).equals(((ConsumerImpl) consumers.get(i)).getTopic()));
        });
        subscribe.getTopics().forEach(str5 -> {
            log.debug("getTopics topic: {}", str5);
        });
        for (int i2 = 0; i2 < 40 / 4; i2++) {
            create.send((str3 + "producer1-" + i2).getBytes());
            create2.send((str3 + "producer2-" + i2).getBytes());
            create3.send((str3 + "producer3-" + i2).getBytes());
            create4.send((str3 + "producer4-" + i2).getBytes());
        }
        int i3 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i3++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i3, 40);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
        create4.close();
    }

    @Test(timeOut = testTimeout)
    public void testTopicsPatternFilter() throws Exception {
        boolean z;
        ArrayList newArrayList = Lists.newArrayList(new String[]{"persistent://my-property/my-ns/pattern-topic-1", "persistent://my-property/my-ns/pattern-topic-2", "persistent://my-property/my-ns/hello-3", "non-persistent://my-property/my-ns/hello-4"});
        List list = PulsarClientImpl.topicsPatternFilter(newArrayList, Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"));
        Assert.assertTrue(list.size() == 2 && list.contains("persistent://my-property/my-ns/pattern-topic-1") && list.contains("persistent://my-property/my-ns/pattern-topic-2"));
        List list2 = PulsarClientImpl.topicsPatternFilter(newArrayList, Pattern.compile("persistent://my-property/my-ns/.*"));
        if (list2.size() == 4) {
            Stream of = Stream.of((Object[]) new String[]{"persistent://my-property/my-ns/pattern-topic-1", "persistent://my-property/my-ns/pattern-topic-2", "persistent://my-property/my-ns/hello-3", "non-persistent://my-property/my-ns/hello-4"});
            list2.getClass();
            if (of.allMatch((v1) -> {
                return r1.contains(v1);
            })) {
                z = true;
                Assert.assertTrue(z);
            }
        }
        z = false;
        Assert.assertTrue(z);
    }

    @Test(timeOut = testTimeout)
    public void testTopicsListMinus() throws Exception {
        ArrayList newArrayList = Lists.newArrayList(new String[]{"persistent://my-property/my-ns/pattern-topic-1", "persistent://my-property/my-ns/pattern-topic-2", "persistent://my-property/my-ns/pattern-topic-3", "persistent://my-property/my-ns/pattern-topic-4"});
        ArrayList newArrayList2 = Lists.newArrayList(new String[]{"persistent://my-property/my-ns/pattern-topic-3", "persistent://my-property/my-ns/pattern-topic-4", "persistent://my-property/my-ns/pattern-topic-5", "persistent://my-property/my-ns/pattern-topic-6"});
        List list = PatternMultiTopicsConsumerImpl.topicsListsMinus(newArrayList2, newArrayList);
        List list2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(newArrayList, newArrayList2);
        Assert.assertTrue(list.size() == 2 && list.contains("persistent://my-property/my-ns/pattern-topic-5") && list.contains("persistent://my-property/my-ns/pattern-topic-6"));
        Assert.assertTrue(list2.size() == 2 && list2.contains("persistent://my-property/my-ns/pattern-topic-1") && list2.contains("persistent://my-property/my-ns/pattern-topic-2"));
        List list3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(list, list2);
        Assert.assertTrue(list3.size() == 2 && list3.contains("persistent://my-property/my-ns/pattern-topic-5") && list3.contains("persistent://my-property/my-ns/pattern-topic-6"));
        List list4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(list, list);
        Assert.assertEquals(list4.size(), 0);
        List list5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(list3, list4);
        Assert.assertTrue(list5.size() == list3.size());
        list5.forEach(str -> {
            Assert.assertTrue(list3.contains(str));
        });
        Assert.assertEquals(PatternMultiTopicsConsumerImpl.topicsListsMinus(list4, list3).size(), 0);
    }

    @Test(timeOut = testTimeout)
    public void testStartEmptyPatternConsumer() throws Exception {
        String str = "persistent://my-property/my-ns/pattern-topic-2-StartEmptyPatternConsumerTest";
        String str2 = "persistent://my-property/my-ns/pattern-topic-3-StartEmptyPatternConsumerTest";
        Pattern compile = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-StartEmptyPatternConsumerTest").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertSame(compile, subscribe.getPattern());
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 0);
        Assert.assertEquals(subscribe.getConsumers().size(), 0);
        Assert.assertEquals(subscribe.getTopics().size(), 0);
        String str3 = "my-message-StartEmptyPatternConsumerTest-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pattern-topic-1-StartEmptyPatternConsumerTest").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl patternMultiTopicsConsumerImpl = subscribe;
        patternMultiTopicsConsumerImpl.run(patternMultiTopicsConsumerImpl.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertSame(compile, subscribe.getPattern());
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 6);
        Assert.assertEquals(subscribe.getConsumers().size(), 6);
        Assert.assertEquals(subscribe.getTopics().size(), 3);
        for (int i = 0; i < 30 / 3; i++) {
            create.send((str3 + "producer1-" + i).getBytes());
            create2.send((str3 + "producer2-" + i).getBytes());
            create3.send((str3 + "producer3-" + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i2++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }

    @Test(timeOut = testTimeout)
    public void testAutoSubscribePatternConsumer() throws Exception {
        String str = "persistent://my-property/my-ns/pattern-topic-2-AutoSubscribePatternConsumer";
        String str2 = "persistent://my-property/my-ns/pattern-topic-3-AutoSubscribePatternConsumer";
        Pattern compile = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        String str3 = "my-message-AutoSubscribePatternConsumer-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pattern-topic-1-AutoSubscribePatternConsumer").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-AutoSubscribePatternConsumer").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof PatternMultiTopicsConsumerImpl);
        Assert.assertSame(compile, subscribe.getPattern());
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 6);
        Assert.assertEquals(subscribe.getConsumers().size(), 6);
        Assert.assertEquals(subscribe.getTopics().size(), 3);
        for (int i = 0; i < 30 / 3; i++) {
            create.send((str3 + "producer1-" + i).getBytes());
            create2.send((str3 + "producer2-" + i).getBytes());
            create3.send((str3 + "producer3-" + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i2++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        String str4 = "persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer";
        this.admin.topics().createPartitionedTopic(str4, 4);
        Producer create4 = this.pulsarClient.newProducer().topic(str4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl patternMultiTopicsConsumerImpl = subscribe;
        patternMultiTopicsConsumerImpl.run(patternMultiTopicsConsumerImpl.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 10);
        Assert.assertEquals(subscribe.getConsumers().size(), 10);
        Assert.assertEquals(subscribe.getTopics().size(), 4);
        for (int i3 = 0; i3 < 30 / 2; i3++) {
            create3.send((str3 + "round2-producer4-" + i3).getBytes());
            create4.send((str3 + "round2-producer4-" + i3).getBytes());
        }
        int i4 = 0;
        Message receive2 = subscribe.receive();
        do {
            Assert.assertTrue(receive2 instanceof TopicMessageImpl);
            i4++;
            subscribe.acknowledge(receive2);
            log.debug("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        Assert.assertEquals(i4, 30);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
        create4.close();
    }

    @Test(timeOut = testTimeout)
    public void testAutoUnbubscribePatternConsumer() throws Exception {
        String str = "persistent://my-property/my-ns/pattern-topic-2-AutoUnsubscribePatternConsumer";
        String str2 = "persistent://my-property/my-ns/pattern-topic-3-AutoUnsubscribePatternConsumer";
        Pattern compile = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(str, 2);
        this.admin.topics().createPartitionedTopic(str2, 3);
        String str3 = "my-message-AutoUnsubscribePatternConsumer-";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pattern-topic-1-AutoUnsubscribePatternConsumer").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create3 = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topicsPattern(compile).patternAutoDiscoveryPeriod(2).subscriptionName("my-ex-subscription-AutoUnsubscribePatternConsumer").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue(subscribe instanceof PatternMultiTopicsConsumerImpl);
        Assert.assertSame(compile, subscribe.getPattern());
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 6);
        Assert.assertEquals(subscribe.getConsumers().size(), 6);
        Assert.assertEquals(subscribe.getTopics().size(), 3);
        for (int i = 0; i < 30 / 3; i++) {
            create.send((str3 + "producer1-" + i).getBytes());
            create2.send((str3 + "producer2-" + i).getBytes());
            create3.send((str3 + "producer3-" + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            Assert.assertTrue(receive instanceof TopicMessageImpl);
            i2++;
            subscribe.acknowledge(receive);
            log.debug("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        ((NamespaceService) Mockito.doReturn(Lists.newArrayList(new String[]{str})).when(this.pulsar.getNamespaceService())).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl patternMultiTopicsConsumerImpl = subscribe;
        patternMultiTopicsConsumerImpl.run(patternMultiTopicsConsumerImpl.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertEquals(subscribe.getPartitionedTopics().size(), 2);
        Assert.assertEquals(subscribe.getConsumers().size(), 2);
        Assert.assertEquals(subscribe.getTopics().size(), 1);
        for (int i3 = 0; i3 < 30; i3++) {
            create2.send((str3 + "round2-producer2-" + i3).getBytes());
        }
        int i4 = 0;
        Message receive2 = subscribe.receive();
        do {
            Assert.assertTrue(receive2 instanceof TopicMessageImpl);
            i4++;
            subscribe.acknowledge(receive2);
            log.debug("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        Assert.assertEquals(i4, 30);
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        create2.close();
        create3.close();
    }
}
