/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.offset;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQBlockListener;
import org.apache.rocketmq.test.message.MessageQueueMsg;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(value=MethodSorters.NAME_ASCENDING)
public class LagCalculationIT
extends BaseConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(LagCalculationIT.class);
    private RMQNormalProducer producer = null;
    private RMQNormalConsumer consumer = null;
    private String topic = null;
    private RMQBlockListener blockListener = null;

    @Before
    public void setUp() {
        this.topic = LagCalculationIT.initTopic();
        LOGGER.info(String.format("use topic: %s;", this.topic));
        for (BrokerController controller : brokerControllerList) {
            controller.getBrokerConfig().setLongPollingEnable(false);
            controller.getBrokerConfig().setShortPollingTimeMills(500L);
            controller.getBrokerConfig().setEstimateAccumulation(true);
        }
        this.producer = LagCalculationIT.getProducer(NAMESRV_ADDR, this.topic);
        this.blockListener = new RMQBlockListener(false);
        this.consumer = LagCalculationIT.getConsumer(NAMESRV_ADDR, this.topic, "*", (AbstractListener)this.blockListener);
    }

    @After
    public void tearDown() {
        LagCalculationIT.shutdown();
    }

    private Pair<Long, Long> getLag(List<MessageQueue> mqs) {
        long lag = 0L;
        long pullLag = 0L;
        for (BrokerController controller : brokerControllerList) {
            ConsumeStats consumeStats = MQAdminTestUtils.examineConsumeStats((String)controller.getBrokerAddr(), (String)this.topic, (String)this.consumer.getConsumerGroup());
            Map offsetTable = consumeStats.getOffsetTable();
            for (MessageQueue mq : mqs) {
                if (!mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) continue;
                long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(this.topic, mq.getQueueId());
                long consumerOffset = controller.getConsumerOffsetManager().queryOffset(this.consumer.getConsumerGroup(), this.topic, mq.getQueueId());
                long pullOffset = controller.getConsumerOffsetManager().queryPullOffset(this.consumer.getConsumerGroup(), this.topic, mq.getQueueId());
                OffsetWrapper offsetWrapper = (OffsetWrapper)offsetTable.get(mq);
                Assert.assertEquals((long)brokerOffset, (long)offsetWrapper.getBrokerOffset());
                if (offsetWrapper.getConsumerOffset() != consumerOffset || offsetWrapper.getPullOffset() != pullOffset) {
                    return new Pair((Object)-1L, (Object)-1L);
                }
                lag += brokerOffset - consumerOffset;
                pullLag += brokerOffset - pullOffset;
            }
        }
        return new Pair((Object)lag, (Object)pullLag);
    }

    public void waitForFullyDispatched() {
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            for (BrokerController controller : brokerControllerList) {
                if (controller.getMessageStore().dispatchBehindBytes() == 0L) continue;
                return false;
            }
            return true;
        });
    }

    @Test
    public void testCalculateLag() {
        int msgSize = 10;
        List mqs = this.producer.getMessageQueue();
        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
        this.producer.send(mqMsgs.getMsgsWithMQ());
        this.waitForFullyDispatched();
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        this.consumer.getConsumer().getDefaultMQPushConsumerImpl().persistConsumerOffset();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            Pair<Long, Long> lag = this.getLag(mqs);
            return (Long)lag.getObject1() == 0L && (Long)lag.getObject2() == 0L;
        });
        this.blockListener.setBlock(true);
        this.consumer.clearMsg();
        this.producer.clearMsg();
        this.producer.send(mqMsgs.getMsgsWithMQ());
        this.waitForFullyDispatched();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            Pair<Long, Long> lag = this.getLag(mqs);
            return (Long)lag.getObject1() == (long)this.producer.getAllMsgBody().size() && (Long)lag.getObject2() == 0L;
        });
        this.blockListener.setBlock(false);
        this.consumer.getListener().waitForMessageConsume(this.producer.getAllMsgBody(), 120000);
        this.consumer.shutdown();
        this.producer.clearMsg();
        this.producer.send(mqMsgs.getMsgsWithMQ());
        this.waitForFullyDispatched();
        Pair<Long, Long> lag = this.getLag(mqs);
        Assert.assertEquals((long)this.producer.getAllMsgBody().size(), (long)((Long)lag.getObject1()));
        Assert.assertEquals((long)this.producer.getAllMsgBody().size(), (long)((Long)lag.getObject2()));
    }

    @Test
    public void testEstimateLag() throws Exception {
        int msgNoTagSize = 80;
        int msgWithTagSize = 20;
        int repeat = 2;
        String tag = "TAG_FOR_TEST_ESTIMATE";
        String sql = "TAGS = 'TAG_FOR_TEST_ESTIMATE' And value < " + repeat / 2;
        MessageSelector selector = MessageSelector.bySql((String)sql);
        RMQBlockListener sqlListener = new RMQBlockListener(true);
        RMQSqlConsumer sqlConsumer = ConsumerFactory.getRMQSqlConsumer((String)NAMESRV_ADDR, (String)LagCalculationIT.initConsumerGroup(), (String)this.topic, (MessageSelector)selector, (AbstractListener)sqlListener);
        RMQBlockListener tagListener = new RMQBlockListener(true);
        RMQNormalConsumer tagConsumer = LagCalculationIT.getConsumer(NAMESRV_ADDR, this.topic, tag, (AbstractListener)tagListener);
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)this.topic, (String)sql, (String)"SQL92");
        for (BrokerController controller : brokerControllerList) {
            controller.getConsumerFilterManager().register(this.topic, sqlConsumer.getConsumerGroup(), sql, "SQL92", subscriptionData.getSubVersion());
        }
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> sqlListener.isBlocked() && tagListener.isBlocked());
        List mqs = this.producer.getMessageQueue();
        int i = 0;
        while (i < repeat) {
            MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgNoTagSize);
            Map msgMap = mqMsgs.getMsgsWithMQ();
            mqMsgs = new MessageQueueMsg(mqs, msgWithTagSize, tag);
            Map msgWithTagMap = mqMsgs.getMsgsWithMQ();
            int finalI = i++;
            msgMap.forEach((mq, msgList) -> {
                List msgWithTagList = (List)msgWithTagMap.get(mq);
                for (Object o : msgWithTagList) {
                    ((Message)o).putUserProperty("value", String.valueOf(finalI));
                }
                msgList.addAll(msgWithTagList);
                Collections.shuffle(msgList);
            });
            this.producer.send(msgMap);
        }
        for (BrokerController controller : brokerControllerList) {
            for (MessageQueue mq2 : mqs) {
                if (!mq2.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) continue;
                long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(this.topic, mq2.getQueueId());
                long estimateMessageCount = controller.getMessageStore().estimateMessageCount(this.topic, mq2.getQueueId(), 0L, brokerOffset, (MessageFilter)new DefaultMessageFilter(FilterAPI.buildSubscriptionData((String)this.topic, (String)tag)));
                Assert.assertEquals((long)(repeat * msgWithTagSize), (long)estimateMessageCount);
            }
        }
        for (BrokerController controller : brokerControllerList) {
            for (MessageQueue mq2 : mqs) {
                if (!mq2.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) continue;
                long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(this.topic, mq2.getQueueId());
                ConsumerFilterData consumerFilterData = controller.getConsumerFilterManager().get(this.topic, sqlConsumer.getConsumerGroup());
                long estimateMessageCount = controller.getMessageStore().estimateMessageCount(this.topic, mq2.getQueueId(), 0L, brokerOffset, (MessageFilter)new ExpressionMessageFilter(subscriptionData, consumerFilterData, controller.getConsumerFilterManager()));
                Assert.assertEquals((long)(repeat / 2 * msgWithTagSize), (long)estimateMessageCount);
            }
        }
        sqlConsumer.shutdown();
        tagConsumer.shutdown();
    }
}

