package org.apache.rocketmq.test.retry;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.offset.OffsetResetIT;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/retry/PopConsumerRetryIT.class */
public class PopConsumerRetryIT extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetIT.class);
    private DefaultMQAdminExt defaultMQAdminExt = null;
    private String topicName = null;
    private String groupName = null;

    @Before
    public void init() throws MQClientException {
        this.topicName = "topic-" + RandomStringUtils.randomAlphabetic(72).toUpperCase();
        this.groupName = "group-" + RandomStringUtils.randomAlphabetic(72).toUpperCase();
        LOGGER.info(String.format("use topic: %s, group: %s", this.topicName, this.groupName));
        IntegrationTestBase.initTopic(this.topicName, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
        this.defaultMQAdminExt = getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        shutdown();
    }

    private void switchPop(String str, String str2) throws Exception {
        for (String str3 : (Set) this.defaultMQAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().values().stream().map((v0) -> {
            return v0.selectBrokerAddr();
        }).collect(Collectors.toSet())) {
            this.defaultMQAdminExt.createAndUpdateTopicConfig(str3, new TopicConfig(str2, 1, 1, 6));
            this.defaultMQAdminExt.setMessageRequestMode(str3, str2, str, MessageRequestMode.POP, 8, 3000L);
        }
    }

    @Test
    public void testNormalMessageUseMessageVersionV2() throws Exception {
        switchPop(this.groupName, this.topicName);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.groupName);
        defaultMQPushConsumer.subscribe(this.topicName, "*");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        defaultMQPushConsumer.setConsumeThreadMin(1);
        defaultMQPushConsumer.setConsumeThreadMax(1);
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        defaultMQPushConsumer.setNamesrvAddr(NAMESRV_ADDR);
        defaultMQPushConsumer.setClientRebalance(false);
        defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            Iterator it = list.iterator();
            if (!it.hasNext()) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            MessageExt messageExt = (MessageExt) it.next();
            LOGGER.debug(String.format("messageId: %s, times: %d, topic: %s", messageExt.getMsgId(), Integer.valueOf(messageExt.getReconsumeTimes()), messageExt.getTopic()));
            if (messageExt.getReconsumeTimes() < 2) {
                atomicInteger2.incrementAndGet();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            atomicInteger.incrementAndGet();
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        defaultMQPushConsumer.start();
        LOGGER.info("Consumer Started...");
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("PID-1", false, (String) null);
        defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
        defaultMQProducer.setNamesrvAddr(NAMESRV_ADDR);
        defaultMQProducer.start();
        LOGGER.info("Producer Started...%n");
        TimeUnit.SECONDS.sleep(3L);
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(SendStatus.SEND_OK, defaultMQProducer.send(new Message(this.topicName, "*", "Hello world".getBytes("UTF-8"))).getSendStatus());
        }
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(90L, TimeUnit.SECONDS).until(() -> {
            LOGGER.debug(String.format("retry: %d, success: %d", Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger.get())));
            return Boolean.valueOf(atomicInteger2.get() == i * 2 && atomicInteger.get() == i);
        });
    }

    @Test
    public void testFIFOMessageUseMessageVersionV2() throws Exception {
        switchPop(this.groupName, this.topicName);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.groupName);
        defaultMQPushConsumer.subscribe(this.topicName, "*");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        defaultMQPushConsumer.setConsumeThreadMin(1);
        defaultMQPushConsumer.setConsumeThreadMax(1);
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        defaultMQPushConsumer.setNamesrvAddr(NAMESRV_ADDR);
        defaultMQPushConsumer.setClientRebalance(false);
        defaultMQPushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
            Iterator it = list.iterator();
            if (!it.hasNext()) {
                return ConsumeOrderlyStatus.SUCCESS;
            }
            MessageExt messageExt = (MessageExt) it.next();
            LOGGER.debug(String.format("messageId: %s, times: %d, topic: %s", messageExt.getMsgId(), Integer.valueOf(messageExt.getReconsumeTimes()), messageExt.getTopic()));
            if (messageExt.getReconsumeTimes() < 2) {
                atomicInteger2.incrementAndGet();
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            atomicInteger.incrementAndGet();
            return ConsumeOrderlyStatus.SUCCESS;
        });
        defaultMQPushConsumer.start();
        LOGGER.info("Consumer Started...");
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("PID-1", false, (String) null);
        defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
        defaultMQProducer.setNamesrvAddr(NAMESRV_ADDR);
        defaultMQProducer.start();
        LOGGER.info("Producer Started...%n");
        TimeUnit.SECONDS.sleep(3L);
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(SendStatus.SEND_OK, defaultMQProducer.send(new Message(this.topicName, "*", "Hello world".getBytes("UTF-8"))).getSendStatus());
        }
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(90L, TimeUnit.SECONDS).until(() -> {
            LOGGER.debug(String.format("retry: %d, success: %d", Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger.get())));
            return Boolean.valueOf(atomicInteger2.get() == i * 2 && atomicInteger.get() == i);
        });
    }
}
