package org.apache.rocketmq.test.offset;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.message.MessageQueueMsg;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/rocketmq/test/offset/OffsetResetIT.class */
public class OffsetResetIT extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetIT.class);
    private RMQNormalListener listener = null;
    private RMQNormalProducer producer = null;
    private RMQNormalConsumer consumer = null;
    private DefaultMQAdminExt defaultMQAdminExt = null;
    private String topic = null;

    @Before
    public void init() throws MQClientException {
        this.topic = initTopic();
        LOGGER.info(String.format("use topic: %s;", this.topic));
        for (BrokerController brokerController : brokerControllerList) {
            brokerController.getBrokerConfig().setLongPollingEnable(false);
            brokerController.getBrokerConfig().setShortPollingTimeMills(500L);
            brokerController.getBrokerConfig().setUseServerSideResetOffset(true);
        }
        this.listener = new RMQNormalListener();
        this.producer = getProducer(NAMESRV_ADDR, this.topic);
        this.consumer = getConsumer(NAMESRV_ADDR, this.topic, "*", this.listener);
        this.defaultMQAdminExt = BaseConf.getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        shutdown();
    }

    @Test
    public void testEncodeOffsetHeader() {
        ResetOffsetRequestHeader resetOffsetRequestHeader = new ResetOffsetRequestHeader();
        resetOffsetRequestHeader.setTopic(this.topic);
        resetOffsetRequestHeader.setGroup(this.consumer.getConsumerGroup());
        resetOffsetRequestHeader.setTimestamp(System.currentTimeMillis());
        resetOffsetRequestHeader.setForce(false);
        RemotingCommand.createRequestCommand(222, resetOffsetRequestHeader);
    }

    private long getConsumerLag(String str, String str2) throws Exception {
        long j = 0;
        for (BrokerController brokerController : brokerControllerList) {
            for (Map.Entry entry : this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(brokerController.getBrokerAddr(), str2, str, 3000L).getOffsetTable().entrySet()) {
                MessageQueue messageQueue = (MessageQueue) entry.getKey();
                OffsetWrapper offsetWrapper = (OffsetWrapper) entry.getValue();
                Assert.assertEquals(messageQueue.getBrokerName(), brokerController.getBrokerConfig().getBrokerName());
                long maxOffsetInQueue = brokerController.getMessageStore().getMaxOffsetInQueue(str, messageQueue.getQueueId());
                long queryOffset = brokerController.getConsumerOffsetManager().queryOffset(this.consumer.getConsumerGroup(), str, messageQueue.getQueueId());
                Assert.assertEquals(maxOffsetInQueue, offsetWrapper.getBrokerOffset());
                Assert.assertEquals(queryOffset, offsetWrapper.getConsumerOffset());
                j += maxOffsetInQueue - queryOffset;
            }
        }
        return j;
    }

    @Test
    public void testResetOffsetSingleQueue() throws Exception {
        this.producer.send(new MessageQueueMsg(this.producer.getMessageQueue(), 100).getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(0 == getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        });
        Iterator<BrokerController> it = brokerControllerList.iterator();
        while (it.hasNext()) {
            this.defaultMQAdminExt.resetOffsetByQueueId(it.next().getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 3, 0L);
        }
        int i = this.listener.getMsgIndex().get();
        int size = brokerControllerList.size() * 100;
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(((long) this.listener.getMsgIndex().get()) >= ((long) (i + size)));
        });
    }

    @Test
    public void testResetOffsetTotal() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        List messageQueue = this.producer.getMessageQueue();
        this.producer.send(new MessageQueueMsg(messageQueue, 100).getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(0 == getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        });
        Iterator<BrokerController> it = brokerControllerList.iterator();
        while (it.hasNext()) {
            this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().invokeBrokerToResetOffset(it.next().getBrokerAddr(), this.consumer.getTopic(), this.consumer.getConsumerGroup(), currentTimeMillis, true, 3000L);
        }
        int i = this.listener.getMsgIndex().get();
        int size = messageQueue.size() * 100;
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(((long) this.listener.getMsgIndex().get()) >= ((long) (i + size)));
        });
    }

    @Test
    public void testPullOffsetTotal() throws Exception {
        this.producer.send(new MessageQueueMsg(this.producer.getMessageQueue(), 100).getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(0 == getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        });
        long j = 0;
        Iterator<BrokerController> it = brokerControllerList.iterator();
        while (it.hasNext()) {
            j += this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(it.next().getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 3000L).computeInflightTotalDiff();
        }
        Assert.assertEquals(0L, j);
    }
}
