package org.apache.rocketmq.test.client.consumer.topic;

import com.google.common.truth.Truth;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.class */
public class MulConsumerMulTopicIT extends BaseConf {
    private RMQNormalProducer producer = null;

    @Before
    public void setUp() {
        this.producer = getProducer(NAMESRV_ADDR, null);
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testSynSendMessage() {
        String initTopic = initTopic();
        String initTopic2 = initTopic();
        RMQNormalConsumer consumer = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
        consumer.subscribe(initTopic2, "*");
        RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer.getConsumerGroup(), initTopic, "*", (AbstractListener) new RMQNormalListener());
        consumer2.subscribe(initTopic2, "*");
        this.producer.send(MQMessageFactory.getMsg(initTopic, 10));
        this.producer.send(MQMessageFactory.getMsg(initTopic2, 10));
        Assert.assertEquals("Not all sent succeeded", 10 * 2, this.producer.getAllUndupMsgBody().size());
        Truth.assertThat(Boolean.valueOf(MQWait.waitConsumeAll(120000, this.producer.getAllMsgBody(), new AbstractListener[]{consumer.getListener(), consumer2.getListener()}))).isEqualTo(true);
    }

    @Test
    public void testConsumeWithDiffTag() {
        String initTopic = initTopic();
        String initTopic2 = initTopic();
        RMQNormalConsumer consumer = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
        consumer.subscribe(initTopic2, "jueyin_tag");
        RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer.getConsumerGroup(), initTopic, "*", (AbstractListener) new RMQNormalListener());
        consumer2.subscribe(initTopic2, "jueyin_tag");
        this.producer.send(MQMessageFactory.getMsg(initTopic, 10));
        this.producer.send(MQMessageFactory.getMsg(initTopic2, 10, "jueyin_tag"));
        Assert.assertEquals("Not all sent succeeded", 10 * 2, this.producer.getAllUndupMsgBody().size());
        Truth.assertThat(Boolean.valueOf(MQWait.waitConsumeAll(120000, this.producer.getAllMsgBody(), new AbstractListener[]{consumer.getListener(), consumer2.getListener()}))).isEqualTo(true);
    }

    @Test
    public void testConsumeWithDiffTagAndFilter() {
        String initTopic = initTopic();
        String initTopic2 = initTopic();
        RMQNormalConsumer consumer = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
        consumer.subscribe(initTopic2, "jueyin_tag_1");
        RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
        consumer2.subscribe(initTopic2, "jueyin_tag_1");
        this.producer.send(MQMessageFactory.getMsg(initTopic2, 10, "jueyin_tag_2"));
        this.producer.clearMsg();
        this.producer.send(MQMessageFactory.getMsg(initTopic, 10));
        this.producer.send(MQMessageFactory.getMsg(initTopic2, 10, "jueyin_tag_1"));
        Truth.assertThat(Boolean.valueOf(MQWait.waitConsumeAll(120000, this.producer.getAllMsgBody(), new AbstractListener[]{consumer.getListener(), consumer2.getListener()}))).isEqualTo(true);
    }
}
