package org.apache.rocketmq.test.offset;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQBlockListener;
import org.apache.rocketmq.test.message.MessageQueueMsg;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/rocketmq/test/offset/LagCalculationIT.class */
public class LagCalculationIT extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(LagCalculationIT.class);
    private RMQNormalProducer producer = null;
    private RMQNormalConsumer consumer = null;
    private String topic = null;
    private RMQBlockListener blockListener = null;

    @Before
    public void setUp() {
        this.topic = initTopic();
        LOGGER.info(String.format("use topic: %s;", this.topic));
        for (BrokerController brokerController : brokerControllerList) {
            brokerController.getBrokerConfig().setLongPollingEnable(false);
            brokerController.getBrokerConfig().setShortPollingTimeMills(500L);
            brokerController.getBrokerConfig().setEstimateAccumulation(true);
        }
        this.producer = getProducer(NAMESRV_ADDR, this.topic);
        this.blockListener = new RMQBlockListener(false);
        this.consumer = getConsumer(NAMESRV_ADDR, this.topic, "*", this.blockListener);
    }

    @After
    public void tearDown() {
        shutdown();
    }

    private Pair<Long, Long> getLag(List<MessageQueue> list) {
        long j = 0;
        long j2 = 0;
        for (BrokerController brokerController : brokerControllerList) {
            Map offsetTable = MQAdminTestUtils.examineConsumeStats(brokerController.getBrokerAddr(), this.topic, this.consumer.getConsumerGroup()).getOffsetTable();
            for (MessageQueue messageQueue : list) {
                if (messageQueue.getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName())) {
                    long maxOffsetInQueue = brokerController.getMessageStore().getMaxOffsetInQueue(this.topic, messageQueue.getQueueId());
                    long queryOffset = brokerController.getConsumerOffsetManager().queryOffset(this.consumer.getConsumerGroup(), this.topic, messageQueue.getQueueId());
                    long queryPullOffset = brokerController.getConsumerOffsetManager().queryPullOffset(this.consumer.getConsumerGroup(), this.topic, messageQueue.getQueueId());
                    OffsetWrapper offsetWrapper = (OffsetWrapper) offsetTable.get(messageQueue);
                    Assert.assertEquals(maxOffsetInQueue, offsetWrapper.getBrokerOffset());
                    if (offsetWrapper.getConsumerOffset() != queryOffset || offsetWrapper.getPullOffset() != queryPullOffset) {
                        return new Pair<>(-1L, -1L);
                    }
                    j += maxOffsetInQueue - queryOffset;
                    j2 += maxOffsetInQueue - queryPullOffset;
                }
            }
        }
        return new Pair<>(Long.valueOf(j), Long.valueOf(j2));
    }

    public void waitForFullyDispatched() {
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            Iterator<BrokerController> it = brokerControllerList.iterator();
            while (it.hasNext()) {
                if (it.next().getMessageStore().dispatchBehindBytes() != 0) {
                    return false;
                }
            }
            return true;
        });
    }

    @Test
    public void testCalculateLag() {
        List<MessageQueue> messageQueue = this.producer.getMessageQueue();
        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(messageQueue, 10);
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        waitForFullyDispatched();
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        this.consumer.getConsumer().getDefaultMQPushConsumerImpl().persistConsumerOffset();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            Pair<Long, Long> lag = getLag(messageQueue);
            return Boolean.valueOf(((Long) lag.getObject1()).longValue() == 0 && ((Long) lag.getObject2()).longValue() == 0);
        });
        this.blockListener.setBlock(true);
        this.consumer.clearMsg();
        this.producer.clearMsg();
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        waitForFullyDispatched();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            Pair<Long, Long> lag = getLag(messageQueue);
            return Boolean.valueOf(((Long) lag.getObject1()).longValue() == ((long) this.producer.getAllMsgBody().size()) && ((Long) lag.getObject2()).longValue() == 0);
        });
        this.blockListener.setBlock(false);
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        this.consumer.shutdown();
        this.producer.clearMsg();
        this.producer.send(messageQueueMsg.getMsgsWithMQ());
        waitForFullyDispatched();
        Pair<Long, Long> lag = getLag(messageQueue);
        Assert.assertEquals(this.producer.getAllMsgBody().size(), ((Long) lag.getObject1()).longValue());
        Assert.assertEquals(this.producer.getAllMsgBody().size(), ((Long) lag.getObject2()).longValue());
    }

    @Test
    public void testEstimateLag() throws Exception {
        MessageSelector bySql = MessageSelector.bySql("TAGS = 'TAG_FOR_TEST_ESTIMATE' And value < " + (2 / 2));
        RMQBlockListener rMQBlockListener = new RMQBlockListener(true);
        RMQSqlConsumer rMQSqlConsumer = ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), this.topic, bySql, rMQBlockListener);
        RMQBlockListener rMQBlockListener2 = new RMQBlockListener(true);
        RMQNormalConsumer consumer = getConsumer(NAMESRV_ADDR, this.topic, "TAG_FOR_TEST_ESTIMATE", rMQBlockListener2);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(rMQBlockListener.isBlocked() && rMQBlockListener2.isBlocked());
        });
        List<MessageQueue> messageQueue = this.producer.getMessageQueue();
        for (int i = 0; i < 2; i++) {
            Map msgsWithMQ = new MessageQueueMsg(messageQueue, 80).getMsgsWithMQ();
            Map msgsWithMQ2 = new MessageQueueMsg(messageQueue, 20, "TAG_FOR_TEST_ESTIMATE").getMsgsWithMQ();
            int i2 = i;
            msgsWithMQ.forEach((messageQueue2, list) -> {
                List list = (List) msgsWithMQ2.get(messageQueue2);
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((Message) it.next()).putUserProperty("value", String.valueOf(i2));
                }
                list.addAll(list);
                Collections.shuffle(list);
            });
            this.producer.send(msgsWithMQ);
        }
        for (BrokerController brokerController : brokerControllerList) {
            for (MessageQueue messageQueue3 : messageQueue) {
                if (messageQueue3.getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName())) {
                    Assert.assertEquals(2 * 20, brokerController.getMessageStore().estimateMessageCount(this.topic, messageQueue3.getQueueId(), 0L, brokerController.getMessageStore().getMaxOffsetInQueue(this.topic, messageQueue3.getQueueId()), new DefaultMessageFilter(FilterAPI.buildSubscriptionData(this.topic, "TAG_FOR_TEST_ESTIMATE"))));
                }
            }
        }
        for (BrokerController brokerController2 : brokerControllerList) {
            for (MessageQueue messageQueue4 : messageQueue) {
                if (messageQueue4.getBrokerName().equals(brokerController2.getBrokerConfig().getBrokerName())) {
                    Assert.assertEquals((2 / 2) * 20, brokerController2.getMessageStore().estimateMessageCount(this.topic, messageQueue4.getQueueId(), 0L, brokerController2.getMessageStore().getMaxOffsetInQueue(this.topic, messageQueue4.getQueueId()), new ExpressionMessageFilter(brokerController2.getConsumerManager().findSubscriptionData(rMQSqlConsumer.getConsumerGroup(), this.topic), brokerController2.getConsumerFilterManager().get(this.topic, rMQSqlConsumer.getConsumerGroup()), brokerController2.getConsumerFilterManager())));
                }
            }
        }
        rMQSqlConsumer.shutdown();
        consumer.shutdown();
    }
}
