package org.apache.rocketmq.test.smoke;

import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.class */
public class NormalMessageSendAndRecvIT extends BaseConf {
    private static Logger logger = LoggerFactory.getLogger(NormalMessageSendAndRecvIT.class);
    private RMQNormalConsumer consumer = null;
    private RMQNormalProducer producer = null;
    private String topic = null;
    private String group = null;
    private DefaultMQAdminExt defaultMQAdminExt;

    @Before
    public void setUp() throws Exception {
        this.topic = initTopic();
        this.group = initConsumerGroup();
        logger.info(String.format("use topic: %s;", this.topic));
        this.producer = getProducer(NAMESRV_ADDR, this.topic);
        this.consumer = getConsumer(NAMESRV_ADDR, this.group, this.topic, "*", (AbstractListener) new RMQNormalListener());
        this.defaultMQAdminExt = getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testSynSendMessage() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(120L)).until(() -> {
            try {
                atomicReference2.set(this.defaultMQAdminExt.examineConsumeStats(this.group));
                atomicReference.set(this.producer.getProducer().fetchPublishMessageQueues(this.topic));
                return Boolean.valueOf((((List) atomicReference.get()).isEmpty() || null == atomicReference2.get() || !((ConsumeStats) atomicReference2.get()).getOffsetTable().keySet().containsAll((Collection) atomicReference.get())) ? false : true);
            } catch (MQClientException e) {
                logger.debug("Exception raised while checking producer and consumer are started", e);
                return false;
            }
        });
        Iterator it = ((List) atomicReference.get()).iterator();
        while (it.hasNext()) {
            this.producer.send(10, (MessageQueue) it.next());
        }
        Assert.assertEquals("Not all sent succeeded", 10 * ((List) atomicReference.get()).size(), this.producer.getAllUndupMsgBody().size());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Truth.assertThat(VerifyUtils.getFilterdMessage(this.producer.getAllMsgBody(), this.consumer.getListener().getAllMsgBody())).containsExactlyElementsIn(this.producer.getAllMsgBody());
        Iterator it2 = this.consumer.getListener().getAllOriginMsg().iterator();
        while (it2.hasNext()) {
            Truth.assertThat(((MessageClientExt) it2.next()).getProperty("POP_CK")).isNull();
        }
        this.consumer.getConsumer().shutdown();
        atomicReference2.set(this.defaultMQAdminExt.examineConsumeStats(this.group));
        for (MessageQueue messageQueue : (List) atomicReference.get()) {
            Assert.assertTrue(((ConsumeStats) atomicReference2.get()).getOffsetTable().containsKey(messageQueue));
            Assert.assertEquals(10, ((OffsetWrapper) ((ConsumeStats) atomicReference2.get()).getOffsetTable().get(messageQueue)).getConsumerOffset());
            Assert.assertEquals(10, ((OffsetWrapper) ((ConsumeStats) atomicReference2.get()).getOffsetTable().get(messageQueue)).getBrokerOffset());
        }
    }

    @Test
    public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception {
        resetStoreConfigWithEnableBuildConsumeQueueConcurrently(true);
        testSynSendMessage();
        resetStoreConfigWithEnableBuildConsumeQueueConcurrently(false);
    }

    void resetStoreConfigWithEnableBuildConsumeQueueConcurrently(boolean z) {
        brokerController1.shutdown();
        MessageStoreConfig messageStoreConfig = brokerController1.getMessageStoreConfig();
        BrokerConfig brokerConfig = brokerController1.getBrokerConfig();
        messageStoreConfig.setEnableBuildConsumeQueueConcurrently(z);
        brokerController1 = IntegrationTestBase.createAndStartBroker(messageStoreConfig, brokerConfig);
        brokerController2.shutdown();
        MessageStoreConfig messageStoreConfig2 = brokerController2.getMessageStoreConfig();
        BrokerConfig brokerConfig2 = brokerController2.getBrokerConfig();
        messageStoreConfig2.setEnableBuildConsumeQueueConcurrently(z);
        brokerController2 = IntegrationTestBase.createAndStartBroker(messageStoreConfig2, brokerConfig2);
        brokerController3.shutdown();
        MessageStoreConfig messageStoreConfig3 = brokerController3.getMessageStoreConfig();
        BrokerConfig brokerConfig3 = brokerController3.getBrokerConfig();
        messageStoreConfig3.setEnableBuildConsumeQueueConcurrently(z);
        brokerController3 = IntegrationTestBase.createAndStartBroker(messageStoreConfig3, brokerConfig3);
        brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
        brokerControllerMap = (Map) brokerControllerList.stream().collect(Collectors.toMap(brokerController -> {
            return brokerController.getBrokerConfig().getBrokerName();
        }, Function.identity()));
    }
}
