/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.retry;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.offset.OffsetResetIT;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class PopConsumerRetryIT
extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetIT.class);
    private DefaultMQAdminExt defaultMQAdminExt = null;
    private String topicName = null;
    private String groupName = null;

    @Before
    public void init() throws MQClientException {
        this.topicName = "topic-" + RandomStringUtils.randomAlphabetic((int)72).toUpperCase();
        this.groupName = "group-" + RandomStringUtils.randomAlphabetic((int)72).toUpperCase();
        LOGGER.info(String.format("use topic: %s, group: %s", this.topicName, this.groupName));
        IntegrationTestBase.initTopic(this.topicName, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
        this.defaultMQAdminExt = PopConsumerRetryIT.getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        PopConsumerRetryIT.shutdown();
    }

    private void switchPop(String groupName, String topicName) throws Exception {
        ClusterInfo clusterInfo = this.defaultMQAdminExt.examineBrokerClusterInfo();
        Set brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
        for (String brokerAddr : brokerAddrs) {
            TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, 6);
            this.defaultMQAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
            this.defaultMQAdminExt.setMessageRequestMode(brokerAddr, topicName, groupName, MessageRequestMode.POP, 8, 3000L);
        }
    }

    @Test
    public void testNormalMessageUseMessageVersionV2() throws Exception {
        this.switchPop(this.groupName, this.topicName);
        AtomicInteger successCount = new AtomicInteger();
        AtomicInteger retryCount = new AtomicInteger();
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.groupName);
        consumer.subscribe(this.topicName, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setClientRebalance(false);
        consumer.registerMessageListener((msgs, context) -> {
            Iterator iterator = msgs.iterator();
            if (iterator.hasNext()) {
                MessageExt message = (MessageExt)iterator.next();
                LOGGER.debug(String.format("messageId: %s, times: %d, topic: %s", message.getMsgId(), message.getReconsumeTimes(), message.getTopic()));
                if (message.getReconsumeTimes() < 2) {
                    retryCount.incrementAndGet();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                successCount.incrementAndGet();
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        LOGGER.info("Consumer Started...");
        DefaultMQProducer producer = new DefaultMQProducer("PID-1", false, null);
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        LOGGER.info("Producer Started...%n");
        TimeUnit.SECONDS.sleep(3L);
        int total = 10;
        for (int i = 0; i < total; ++i) {
            Message msg = new Message(this.topicName, "*", "Hello world".getBytes("UTF-8"));
            SendResult sendResult = producer.send(msg);
            Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
        }
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(90L, TimeUnit.SECONDS).until(() -> {
            LOGGER.debug(String.format("retry: %d, success: %d", retryCount.get(), successCount.get()));
            return retryCount.get() == total * 2 && successCount.get() == total;
        });
    }

    @Test
    public void testFIFOMessageUseMessageVersionV2() throws Exception {
        this.switchPop(this.groupName, this.topicName);
        AtomicInteger successCount = new AtomicInteger();
        AtomicInteger retryCount = new AtomicInteger();
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.groupName);
        consumer.subscribe(this.topicName, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setClientRebalance(false);
        consumer.registerMessageListener((msgs, context) -> {
            Iterator iterator = msgs.iterator();
            if (iterator.hasNext()) {
                MessageExt message = (MessageExt)iterator.next();
                LOGGER.debug(String.format("messageId: %s, times: %d, topic: %s", message.getMsgId(), message.getReconsumeTimes(), message.getTopic()));
                if (message.getReconsumeTimes() < 2) {
                    retryCount.incrementAndGet();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                successCount.incrementAndGet();
                return ConsumeOrderlyStatus.SUCCESS;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
        LOGGER.info("Consumer Started...");
        DefaultMQProducer producer = new DefaultMQProducer("PID-1", false, null);
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        LOGGER.info("Producer Started...%n");
        TimeUnit.SECONDS.sleep(3L);
        int total = 10;
        for (int i = 0; i < total; ++i) {
            Message msg = new Message(this.topicName, "*", "Hello world".getBytes("UTF-8"));
            SendResult sendResult = producer.send(msg);
            Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
        }
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(90L, TimeUnit.SECONDS).until(() -> {
            LOGGER.debug(String.format("retry: %d, success: %d", retryCount.get(), successCount.get()));
            return retryCount.get() == total * 2 && successCount.get() == total;
        });
    }
}

