/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.offset;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.message.MessageQueueMsg;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(value=MethodSorters.NAME_ASCENDING)
public class OffsetResetIT
extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetResetIT.class);
    private RMQNormalListener listener = null;
    private RMQNormalProducer producer = null;
    private RMQNormalConsumer consumer = null;
    private DefaultMQAdminExt defaultMQAdminExt = null;
    private String topic = null;

    @Before
    public void init() throws MQClientException {
        this.topic = OffsetResetIT.initTopic();
        LOGGER.info(String.format("use topic: %s;", this.topic));
        for (BrokerController controller : brokerControllerList) {
            controller.getBrokerConfig().setLongPollingEnable(false);
            controller.getBrokerConfig().setShortPollingTimeMills(500L);
            controller.getBrokerConfig().setUseServerSideResetOffset(true);
        }
        this.listener = new RMQNormalListener();
        this.producer = OffsetResetIT.getProducer(NAMESRV_ADDR, this.topic);
        this.consumer = OffsetResetIT.getConsumer(NAMESRV_ADDR, this.topic, "*", (AbstractListener)this.listener);
        this.defaultMQAdminExt = BaseConf.getAdmin(NAMESRV_ADDR);
        this.defaultMQAdminExt.start();
    }

    @After
    public void tearDown() {
        OffsetResetIT.shutdown();
    }

    @Test
    public void testEncodeOffsetHeader() {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(this.topic);
        requestHeader.setGroup(this.consumer.getConsumerGroup());
        requestHeader.setTimestamp(System.currentTimeMillis());
        requestHeader.setForce(false);
        RemotingCommand.createRequestCommand((int)222, (CommandCustomHeader)requestHeader);
    }

    private long getConsumerLag(String topic, String group) throws Exception {
        long consumerLag = 0L;
        for (BrokerController controller : brokerControllerList) {
            ConsumeStats consumeStats = this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(controller.getBrokerAddr(), group, topic, 3000L);
            Map offsetTable = consumeStats.getOffsetTable();
            for (Map.Entry entry : offsetTable.entrySet()) {
                MessageQueue messageQueue = (MessageQueue)entry.getKey();
                OffsetWrapper offsetWrapper = (OffsetWrapper)entry.getValue();
                Assert.assertEquals((Object)messageQueue.getBrokerName(), (Object)controller.getBrokerConfig().getBrokerName());
                long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(topic, messageQueue.getQueueId());
                long consumerOffset = controller.getConsumerOffsetManager().queryOffset(this.consumer.getConsumerGroup(), topic, messageQueue.getQueueId());
                Assert.assertEquals((long)brokerOffset, (long)offsetWrapper.getBrokerOffset());
                Assert.assertEquals((long)consumerOffset, (long)offsetWrapper.getConsumerOffset());
                consumerLag += brokerOffset - consumerOffset;
            }
        }
        return consumerLag;
    }

    @Test
    public void testResetOffsetSingleQueue() throws Exception {
        int msgSize = 100;
        List mqs = this.producer.getMessageQueue();
        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> 0L == this.getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        for (BrokerController controller : brokerControllerList) {
            this.defaultMQAdminExt.resetOffsetByQueueId(controller.getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 3, 0L);
        }
        int hasConsumeBefore = this.listener.getMsgIndex().get();
        int expectAfterReset = brokerControllerList.size() * msgSize;
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            long expect;
            long receive = this.listener.getMsgIndex().get();
            return receive >= (expect = (long)(hasConsumeBefore + expectAfterReset));
        });
    }

    @Test
    public void testResetOffsetTotal() throws Exception {
        int msgSize = 100;
        long start = System.currentTimeMillis();
        List mqs = this.producer.getMessageQueue();
        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> 0L == this.getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        for (BrokerController controller : brokerControllerList) {
            this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().invokeBrokerToResetOffset(controller.getBrokerAddr(), this.consumer.getTopic(), this.consumer.getConsumerGroup(), start, true, 3000L);
        }
        int hasConsumeBefore = this.listener.getMsgIndex().get();
        int expectAfterReset = mqs.size() * msgSize;
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> {
            long expect;
            long receive = this.listener.getMsgIndex().get();
            return receive >= (expect = (long)(hasConsumeBefore + expectAfterReset));
        });
    }

    @Test
    public void testPullOffsetTotal() throws Exception {
        int msgSize = 100;
        List mqs = this.producer.getMessageQueue();
        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofMinutes(3L)).until(() -> 0L == this.getConsumerLag(this.topic, this.consumer.getConsumerGroup()));
        long expectInflight = 0L;
        for (BrokerController controller : brokerControllerList) {
            ConsumeStats consumeStats = this.defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(controller.getBrokerAddr(), this.consumer.getConsumerGroup(), this.consumer.getTopic(), 3000L);
            expectInflight += consumeStats.computeInflightTotalDiff();
        }
        Assert.assertEquals((long)0L, (long)expectInflight);
    }
}

