package org.apache.rocketmq.test.offset;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/offset/OffsetResetForPopIT.class */
public class OffsetResetForPopIT extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetForPopIT.class);
    private String topic;
    private String group;
    private RMQNormalProducer producer = null;
    private RMQPopConsumer consumer = null;
    private DefaultMQAdminExt adminExt;

    @Before
    public void setUp() throws Exception {
        brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
        this.adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
        this.adminExt.start();
        this.topic = MQRandomUtils.getRandomTopic();
        createAndWaitTopicRegister(BROKER1_NAME, this.topic);
        this.group = initConsumerGroup();
        LOGGER.info(String.format("use topic: %s, group: %s", this.topic, this.group));
        this.producer = getProducer(NAMESRV_ADDR, this.topic);
    }

    @After
    public void tearDown() {
        shutdown();
    }

    private void createAndWaitTopicRegister(String str, String str2) throws Exception {
        String fetchMasterAddrByBrokerName = CommandUtil.fetchMasterAddrByBrokerName(this.adminExt, str);
        TopicConfig topicConfig = new TopicConfig(str2);
        topicConfig.setReadQueueNums(1);
        topicConfig.setWriteQueueNums(1);
        this.adminExt.createAndUpdateTopicConfig(fetchMasterAddrByBrokerName, topicConfig);
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(MQAdminTestUtils.checkTopicExist(this.adminExt, str2));
        });
    }

    private void resetOffsetInner(long j) {
        try {
            this.adminExt.resetOffsetByQueueId(brokerController1.getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 0, j);
        } catch (Exception e) {
        }
    }

    private void ackMessageSync(MessageExt messageExt) {
        try {
            this.consumer.ackAsync(brokerController1.getBrokerAddr(), messageExt.getProperty("POP_CK")).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void ackMessageSync(List<MessageExt> list) {
        if (list != null) {
            list.forEach(this::ackMessageSync);
        }
    }

    @Test
    public void testResetOffsetAfterPop() throws Exception {
        this.producer.send(10);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener());
        this.consumer.start();
        MessageQueue messageQueue = new MessageQueue(this.topic, BROKER1_NAME, 0);
        Assert.assertEquals(10L, this.consumer.pop(brokerController1.getBrokerAddr(), messageQueue).getMsgFoundList().size());
        resetOffsetInner(4);
        PopResult pop = this.consumer.pop(brokerController1.getBrokerAddr(), messageQueue);
        Assert.assertTrue((pop == null || pop.getMsgFoundList() == null) ? false : true);
        Assert.assertEquals(10 - 4, pop.getMsgFoundList().size());
    }

    @Test
    public void testResetOffsetThenAckOldForPopOrderly() throws Exception {
        this.producer.send(10);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener());
        this.consumer.start();
        MessageQueue messageQueue = new MessageQueue(this.topic, BROKER1_NAME, 0);
        PopResult popOrderly = this.consumer.popOrderly(brokerController1.getBrokerAddr(), messageQueue);
        Assert.assertEquals(10L, popOrderly.getMsgFoundList().size());
        resetOffsetInner(2);
        Assert.assertEquals(2, ((OffsetWrapper) this.adminExt.examineConsumeStats(this.group, this.topic).getOffsetTable().get(messageQueue)).getConsumerOffset());
        PopResult popOrderly2 = this.consumer.popOrderly(brokerController1.getBrokerAddr(), messageQueue);
        Assert.assertTrue((popOrderly2 == null || popOrderly2.getMsgFoundList() == null) ? false : true);
        Assert.assertEquals(10 - 2, popOrderly2.getMsgFoundList().size());
        ackMessageSync(popOrderly.getMsgFoundList());
        Assert.assertTrue(brokerController1.getConsumerOrderInfoManager().checkBlock((String) null, this.topic, this.group, 0, 30000L));
        ackMessageSync(popOrderly2.getMsgFoundList());
        Assert.assertFalse(brokerController1.getConsumerOrderInfoManager().checkBlock((String) null, this.topic, this.group, 0, 30000L));
    }

    @Test
    public void testRestOffsetToSkipMsgForPopOrderly() throws Exception {
        this.producer.send(10);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener());
        resetOffsetInner(4);
        this.consumer.start();
        PopResult popOrderly = this.consumer.popOrderly(brokerController1.getBrokerAddr(), new MessageQueue(this.topic, BROKER1_NAME, 0));
        Assert.assertEquals(10 - 4, popOrderly.getMsgFoundList().size());
        Assert.assertTrue(brokerController1.getConsumerOrderInfoManager().checkBlock((String) null, this.topic, this.group, 0, 30000L));
        ackMessageSync(popOrderly.getMsgFoundList());
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse(brokerController1.getConsumerOrderInfoManager().checkBlock((String) null, this.topic, this.group, 0, 30000L));
    }

    @Test
    public void testResetOffsetAfterPopWhenOpenBufferAndWait() throws Exception {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        this.producer.send(10);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener());
        this.consumer.start();
        MessageQueue messageQueue = new MessageQueue(this.topic, BROKER1_NAME, 0);
        Assert.assertEquals(10L, this.consumer.pop(brokerController1.getBrokerAddr(), messageQueue).getMsgFoundList().size());
        resetOffsetInner(4);
        TimeUnit.MILLISECONDS.sleep(brokerController1.getBrokerConfig().getPopCkStayBufferTimeOut());
        PopResult pop = this.consumer.pop(brokerController1.getBrokerAddr(), messageQueue);
        Assert.assertTrue((pop == null || pop.getMsgFoundList() == null) ? false : true);
        Assert.assertEquals(10 - 4, pop.getMsgFoundList().size());
    }

    @Test
    public void testResetOffsetWhilePopWhenOpenBuffer() {
        testResetOffsetWhilePop(8, false, false, 5);
    }

    @Test
    public void testResetOffsetWhilePopWhenOpenBufferAndAck() {
        testResetOffsetWhilePop(8, false, true, 5);
    }

    @Test
    public void testMultipleResetOffsetWhilePopWhenOpenBufferAndAck() {
        testResetOffsetWhilePop(8, false, true, 3, 5);
    }

    @Test
    public void testResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
        testResetOffsetWhilePop(2, true, true, 8);
    }

    @Test
    public void testMultipleResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
        testResetOffsetWhilePop(2, true, true, 5, 8);
    }

    private void testResetOffsetWhilePop(int i, boolean z, boolean z2, int... iArr) {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        this.producer.send(10L);
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener(), 1);
        MessageQueue messageQueue = new MessageQueue(this.topic, BROKER1_NAME, 0);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.consumer.start();
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            while (System.currentTimeMillis() - currentTimeMillis <= 30000) {
                try {
                    PopResult pop = this.consumer.pop(brokerController1.getBrokerAddr(), messageQueue);
                    if (pop != null && pop.getMsgFoundList() != null) {
                        int addAndGet = atomicInteger.addAndGet(pop.getMsgFoundList().size());
                        if (z2) {
                            ackMessageSync(pop.getMsgFoundList());
                        }
                        if (addAndGet == i) {
                            for (int i2 : iArr) {
                                resetOffsetInner(i2);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            boolean z3 = true;
            if (z) {
                z3 = atomicInteger.get() < 10;
            }
            return Boolean.valueOf(z3 & (atomicInteger.get() >= (i + 10) - iArr[iArr.length - 1]));
        });
    }

    @Test
    public void testResetFutureOffsetWhilePopOrderlyAndAck() {
        testResetOffsetWhilePopOrderly(1, Lists.newArrayList(new Integer[]{0, 5, 6, 7, 8, 9}), Lists.newArrayList(new Integer[]{5}), 6);
    }

    @Test
    public void testMultipleResetFutureOffsetWhilePopOrderlyAndAck() {
        testResetOffsetWhilePopOrderly(1, Lists.newArrayList(new Integer[]{0, 5, 6, 7, 8, 9}), Lists.newArrayList(new Integer[]{3, 5}), 6);
    }

    @Test
    public void testResetOffsetWhilePopOrderlyAndAck() {
        testResetOffsetWhilePopOrderly(5, Lists.newArrayList(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), Lists.newArrayList(new Integer[]{3}), 12);
    }

    @Test
    public void testMultipleResetOffsetWhilePopOrderlyAndAck() {
        testResetOffsetWhilePopOrderly(5, Lists.newArrayList(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), Lists.newArrayList(new Integer[]{3, 1}), 14);
    }

    private void testResetOffsetWhilePopOrderly(int i, List<Integer> list, List<Integer> list2, int i2) {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        for (int i3 = 0; i3 < 10; i3++) {
            this.producer.send(new Message(this.topic, String.valueOf(i3).getBytes()));
        }
        this.consumer = new RMQPopConsumer(NAMESRV_ADDR, this.topic, "*", this.group, new RMQNormalListener(), 1);
        MessageQueue messageQueue = new MessageQueue(this.topic, BROKER1_NAME, 0);
        Set newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.consumer.start();
        Executors.newSingleThreadScheduledExecutor().execute(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            while (System.currentTimeMillis() - currentTimeMillis <= 30000) {
                try {
                    PopResult popOrderly = this.consumer.popOrderly(brokerController1.getBrokerAddr(), messageQueue);
                    if (popOrderly != null && popOrderly.getMsgFoundList() != null) {
                        int addAndGet = atomicInteger.addAndGet(popOrderly.getMsgFoundList().size());
                        for (MessageExt messageExt : popOrderly.getMsgFoundList()) {
                            newSetFromMap.add(Integer.valueOf(new String(messageExt.getBody())));
                            ackMessageSync(messageExt);
                        }
                        if (addAndGet == i) {
                            Iterator it = list2.iterator();
                            while (it.hasNext()) {
                                resetOffsetInner(((Integer) it.next()).intValue());
                            }
                        }
                    }
                } catch (Exception e) {
                }
            }
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            boolean z = true;
            if (list.size() == newSetFromMap.size() && atomicInteger.get() == i2) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    z &= newSetFromMap.contains((Integer) it.next());
                }
                return Boolean.valueOf(z);
            }
            return false;
        });
    }
}
