/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.smoke;

import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NormalMessageSendAndRecvIT
extends BaseConf {
    private static Logger logger = LoggerFactory.getLogger(NormalMessageSendAndRecvIT.class);
    private RMQNormalConsumer consumer = null;
    private RMQNormalProducer producer = null;
    private String topic = null;
    private String group = null;
    private DefaultMQAdminExt defaultMQAdminExt;

    @Before
    public void setUp() throws Exception {
        this.topic = NormalMessageSendAndRecvIT.initTopic();
        this.group = NormalMessageSendAndRecvIT.initConsumerGroup();
        logger.info(String.format("use topic: %s;", this.topic));
        this.producer = NormalMessageSendAndRecvIT.getProducer(NAMESRV_ADDR, this.topic);
        this.consumer = NormalMessageSendAndRecvIT.getConsumer(NAMESRV_ADDR, this.group, this.topic, "*", (AbstractListener)new RMQNormalListener());
        this.defaultMQAdminExt = NormalMessageSendAndRecvIT.getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testSynSendMessage() throws Exception {
        AtomicReference messageQueueList = new AtomicReference();
        AtomicReference<ConsumeStats> consumeStats = new AtomicReference<ConsumeStats>();
        Awaitility.await().atMost(Duration.ofSeconds(120L)).until(() -> {
            try {
                consumeStats.set(this.defaultMQAdminExt.examineConsumeStats(this.group));
                messageQueueList.set(this.producer.getProducer().fetchPublishMessageQueues(this.topic));
                return !((List)messageQueueList.get()).isEmpty() && null != consumeStats.get() && ((ConsumeStats)consumeStats.get()).getOffsetTable().keySet().containsAll((Collection)messageQueueList.get());
            }
            catch (MQClientException e) {
                logger.debug("Exception raised while checking producer and consumer are started", (Throwable)e);
                return false;
            }
        });
        int msgSize = 10;
        for (MessageQueue messageQueue : (List)messageQueueList.get()) {
            this.producer.send(msgSize, messageQueue);
        }
        Assert.assertEquals((String)"Not all sent succeeded", (long)(msgSize * ((List)messageQueueList.get()).size()), (long)this.producer.getAllUndupMsgBody().size());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Truth.assertThat((Iterable)VerifyUtils.getFilterdMessage((Collection)this.producer.getAllMsgBody(), (Collection)this.consumer.getListener().getAllMsgBody())).containsExactlyElementsIn((Iterable)this.producer.getAllMsgBody());
        for (Object o : this.consumer.getListener().getAllOriginMsg()) {
            MessageClientExt msg = (MessageClientExt)o;
            Truth.assertThat((String)msg.getProperty("POP_CK")).isNull();
        }
        this.consumer.getConsumer().shutdown();
        consumeStats.set(this.defaultMQAdminExt.examineConsumeStats(this.group));
        for (MessageQueue messageQueue : (List)messageQueueList.get()) {
            Assert.assertTrue((boolean)((ConsumeStats)consumeStats.get()).getOffsetTable().containsKey(messageQueue));
            Assert.assertEquals((long)msgSize, (long)((OffsetWrapper)((ConsumeStats)consumeStats.get()).getOffsetTable().get(messageQueue)).getConsumerOffset());
            Assert.assertEquals((long)msgSize, (long)((OffsetWrapper)((ConsumeStats)consumeStats.get()).getOffsetTable().get(messageQueue)).getBrokerOffset());
        }
    }

    @Test
    public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception {
        this.resetStoreConfigWithEnableBuildConsumeQueueConcurrently(true);
        this.testSynSendMessage();
        this.resetStoreConfigWithEnableBuildConsumeQueueConcurrently(false);
    }

    void resetStoreConfigWithEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) {
        brokerController1.shutdown();
        MessageStoreConfig storeConfig = brokerController1.getMessageStoreConfig();
        BrokerConfig brokerConfig = brokerController1.getBrokerConfig();
        storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
        brokerController1 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
        brokerController2.shutdown();
        storeConfig = brokerController2.getMessageStoreConfig();
        brokerConfig = brokerController2.getBrokerConfig();
        storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
        brokerController2 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
        brokerController3.shutdown();
        storeConfig = brokerController3.getMessageStoreConfig();
        brokerConfig = brokerController3.getBrokerConfig();
        storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
        brokerController3 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
        brokerControllerList = ImmutableList.of((Object)brokerController1, (Object)brokerController2, (Object)brokerController3);
        brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
    }
}

