/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.offset;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class OffsetResetForPopIT
extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetForPopIT.class);
    private String topic;
    private String group;
    private RMQNormalProducer producer = null;
    private RMQPopConsumer consumer = null;
    private DefaultMQAdminExt adminExt;

    @Before
    public void setUp() throws Exception {
        brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
        this.adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
        this.adminExt.start();
        this.topic = MQRandomUtils.getRandomTopic();
        this.createAndWaitTopicRegister(BROKER1_NAME, this.topic);
        this.group = OffsetResetForPopIT.initConsumerGroup();
        LOGGER.info(String.format("use topic: %s, group: %s", this.topic, this.group));
        this.producer = OffsetResetForPopIT.getProducer(NAMESRV_ADDR, this.topic);
    }

    @After
    public void tearDown() {
        OffsetResetForPopIT.shutdown();
    }

    private void createAndWaitTopicRegister(String brokerName, String topic) throws Exception {
        String brokerAddress = CommandUtil.fetchMasterAddrByBrokerName((MQAdminExt)this.adminExt, (String)brokerName);
        TopicConfig topicConfig = new TopicConfig(topic);
        topicConfig.setReadQueueNums(1);
        topicConfig.setWriteQueueNums(1);
        this.adminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> MQAdminTestUtils.checkTopicExist((DefaultMQAdminExt)this.adminExt, (String)topic));
    }

    private void resetOffsetInner(long resetOffset) {
        try {
            this.adminExt.resetOffsetByQueueId(brokerController1.getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 0, resetOffset);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void ackMessageSync(MessageExt messageExt) {
        try {
            this.consumer.ackAsync(brokerController1.getBrokerAddr(), messageExt.getProperty("POP_CK")).get();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void ackMessageSync(List<MessageExt> messageExtList) {
        if (messageExtList != null) {
            messageExtList.forEach(this::ackMessageSync);
        }
    }

    @Test
    public void testResetOffsetAfterPop() throws Exception {
        int messageCount = 10;
        int resetOffset = 4;
        this.producer.send((long)messageCount);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener());
        this.consumer.start();
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        PopResult popResult = this.consumer.pop(brokerController1.getBrokerAddr(), mq);
        Assert.assertEquals((long)10L, (long)popResult.getMsgFoundList().size());
        this.resetOffsetInner(resetOffset);
        popResult = this.consumer.pop(brokerController1.getBrokerAddr(), mq);
        Assert.assertTrue((popResult != null && popResult.getMsgFoundList() != null ? 1 : 0) != 0);
        Assert.assertEquals((long)(messageCount - resetOffset), (long)popResult.getMsgFoundList().size());
    }

    @Test
    public void testResetOffsetThenAckOldForPopOrderly() throws Exception {
        int messageCount = 10;
        int resetOffset = 2;
        this.producer.send((long)messageCount);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener());
        this.consumer.start();
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        PopResult popResult1 = this.consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
        Assert.assertEquals((long)10L, (long)popResult1.getMsgFoundList().size());
        this.resetOffsetInner(resetOffset);
        ConsumeStats consumeStats = this.adminExt.examineConsumeStats(this.group, this.topic);
        Assert.assertEquals((long)resetOffset, (long)((OffsetWrapper)consumeStats.getOffsetTable().get(mq)).getConsumerOffset());
        PopResult popResult2 = this.consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
        Assert.assertTrue((popResult2 != null && popResult2.getMsgFoundList() != null ? 1 : 0) != 0);
        Assert.assertEquals((long)(messageCount - resetOffset), (long)popResult2.getMsgFoundList().size());
        this.ackMessageSync(popResult1.getMsgFoundList());
        Assert.assertTrue((boolean)brokerController1.getConsumerOrderInfoManager().checkBlock(null, this.topic, this.group, 0, 30000L));
        this.ackMessageSync(popResult2.getMsgFoundList());
        Assert.assertFalse((boolean)brokerController1.getConsumerOrderInfoManager().checkBlock(null, this.topic, this.group, 0, 30000L));
    }

    @Test
    public void testRestOffsetToSkipMsgForPopOrderly() throws Exception {
        int messageCount = 10;
        int resetOffset = 4;
        this.producer.send((long)messageCount);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener());
        this.resetOffsetInner(resetOffset);
        this.consumer.start();
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        PopResult popResult = this.consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
        Assert.assertEquals((long)(messageCount - resetOffset), (long)popResult.getMsgFoundList().size());
        Assert.assertTrue((boolean)brokerController1.getConsumerOrderInfoManager().checkBlock(null, this.topic, this.group, 0, 30000L));
        this.ackMessageSync(popResult.getMsgFoundList());
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse((boolean)brokerController1.getConsumerOrderInfoManager().checkBlock(null, this.topic, this.group, 0, 30000L));
    }

    @Test
    public void testResetOffsetAfterPopWhenOpenBufferAndWait() throws Exception {
        int messageCount = 10;
        int resetOffset = 4;
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        this.producer.send((long)messageCount);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener());
        this.consumer.start();
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        PopResult popResult = this.consumer.pop(brokerController1.getBrokerAddr(), mq);
        Assert.assertEquals((long)10L, (long)popResult.getMsgFoundList().size());
        this.resetOffsetInner(resetOffset);
        TimeUnit.MILLISECONDS.sleep(brokerController1.getBrokerConfig().getPopCkStayBufferTimeOut());
        popResult = this.consumer.pop(brokerController1.getBrokerAddr(), mq);
        Assert.assertTrue((popResult != null && popResult.getMsgFoundList() != null ? 1 : 0) != 0);
        Assert.assertEquals((long)(messageCount - resetOffset), (long)popResult.getMsgFoundList().size());
    }

    @Test
    public void testResetOffsetWhilePopWhenOpenBuffer() {
        this.testResetOffsetWhilePop(8, false, false, 5);
    }

    @Test
    public void testResetOffsetWhilePopWhenOpenBufferAndAck() {
        this.testResetOffsetWhilePop(8, false, true, 5);
    }

    @Test
    public void testMultipleResetOffsetWhilePopWhenOpenBufferAndAck() {
        this.testResetOffsetWhilePop(8, false, true, 3, 5);
    }

    @Test
    public void testResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
        this.testResetOffsetWhilePop(2, true, true, 8);
    }

    @Test
    public void testMultipleResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
        this.testResetOffsetWhilePop(2, true, true, 5, 8);
    }

    private void testResetOffsetWhilePop(int targetCount, boolean resetFuture, boolean needAck, int ... resetOffset) {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        this.producer.send(10L);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener(), 1);
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        AtomicInteger counter = new AtomicInteger(0);
        this.consumer.start();
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            long start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start <= 30000L) {
                try {
                    PopResult popResult = this.consumer.pop(brokerController1.getBrokerAddr(), mq);
                    if (popResult == null || popResult.getMsgFoundList() == null) continue;
                    int count = counter.addAndGet(popResult.getMsgFoundList().size());
                    if (needAck) {
                        this.ackMessageSync(popResult.getMsgFoundList());
                    }
                    if (count != targetCount) continue;
                    for (int offset : resetOffset) {
                        this.resetOffsetInner(offset);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            boolean result = true;
            if (resetFuture) {
                result = counter.get() < 10;
            }
            return result &= counter.get() >= targetCount + 10 - resetOffset[resetOffset.length - 1];
        });
    }

    @Test
    public void testResetFutureOffsetWhilePopOrderlyAndAck() {
        this.testResetOffsetWhilePopOrderly(1, Lists.newArrayList((Object[])new Integer[]{0, 5, 6, 7, 8, 9}), Lists.newArrayList((Object[])new Integer[]{5}), 6);
    }

    @Test
    public void testMultipleResetFutureOffsetWhilePopOrderlyAndAck() {
        this.testResetOffsetWhilePopOrderly(1, Lists.newArrayList((Object[])new Integer[]{0, 5, 6, 7, 8, 9}), Lists.newArrayList((Object[])new Integer[]{3, 5}), 6);
    }

    @Test
    public void testResetOffsetWhilePopOrderlyAndAck() {
        this.testResetOffsetWhilePopOrderly(5, Lists.newArrayList((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), Lists.newArrayList((Object[])new Integer[]{3}), 12);
    }

    @Test
    public void testMultipleResetOffsetWhilePopOrderlyAndAck() {
        this.testResetOffsetWhilePopOrderly(5, Lists.newArrayList((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), Lists.newArrayList((Object[])new Integer[]{3, 1}), 14);
    }

    private void testResetOffsetWhilePopOrderly(int targetCount, List<Integer> expectMsgReceive, List<Integer> resetOffset, int expectCount) {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        for (int i = 0; i < 10; ++i) {
            Message msg = new Message(this.topic, String.valueOf(i).getBytes());
            this.producer.send((Object)msg);
        }
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, (AbstractListener)new RMQNormalListener(), 1);
        MessageQueue mq = new MessageQueue(this.topic, BROKER1_NAME, 0);
        Set msgReceive = Collections.newSetFromMap(new ConcurrentHashMap());
        AtomicInteger counter = new AtomicInteger(0);
        this.consumer.start();
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            long start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start <= 30000L) {
                try {
                    PopResult popResult = this.consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
                    if (popResult == null || popResult.getMsgFoundList() == null) continue;
                    int count = counter.addAndGet(popResult.getMsgFoundList().size());
                    for (MessageExt messageExt : popResult.getMsgFoundList()) {
                        msgReceive.add(Integer.valueOf(new String(messageExt.getBody())));
                        this.ackMessageSync(messageExt);
                    }
                    if (count != targetCount) continue;
                    Iterator iterator = resetOffset.iterator();
                    while (iterator.hasNext()) {
                        int offset = (Integer)iterator.next();
                        this.resetOffsetInner(offset);
                    }
                }
                catch (Exception exception) {
                }
            }
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            boolean result = true;
            if (expectMsgReceive.size() != msgReceive.size()) {
                return false;
            }
            if (counter.get() != expectCount) {
                return false;
            }
            for (Integer expectMsg : expectMsgReceive) {
                result &= msgReceive.contains(expectMsg);
            }
            return result;
        });
    }
}

