package org.apache.rocketmq.test.lmq.benchmark;

import com.google.common.math.IntMath;
import com.google.common.math.LongMath;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.util.StatUtil;

/* loaded from: input_file:org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.class */
public class BenchLmqStore {
    public static DefaultMQProducer defaultMQProducer;
    private static final String LMQ_PREFIX = "%LMQ%";
    private static Logger logger = LoggerFactory.getLogger(BenchLmqStore.class);
    private static String namesrv = System.getProperty("namesrv", "127.0.0.1:9876");
    private static String lmqTopic = System.getProperty("lmqTopic", "lmqTestTopic");
    private static boolean enableSub = Boolean.parseBoolean(System.getProperty("enableSub", "true"));
    private static String queuePrefix = System.getProperty("queuePrefix", "lmqTest");
    private static int tps = Integer.parseInt(System.getProperty("tps", "1"));
    private static int lmqNum = Integer.parseInt(System.getProperty("lmqNum", "1"));
    private static int sendThreadNum = Integer.parseInt(System.getProperty("sendThreadNum", "64"));
    private static int consumerThreadNum = Integer.parseInt(System.getProperty("consumerThreadNum", "64"));
    private static String brokerName = System.getProperty("brokerName", "broker-a");
    private static int size = Integer.parseInt(System.getProperty("size", "128"));
    private static int suspendTime = Integer.parseInt(System.getProperty("suspendTime", "2000"));
    private static final boolean RETRY_NO_MATCHED_MSG = Boolean.parseBoolean(System.getProperty("retry_no_matched_msg", "false"));
    private static boolean benchOffset = Boolean.parseBoolean(System.getProperty("benchOffset", "false"));
    private static int benchOffsetNum = Integer.parseInt(System.getProperty("benchOffsetNum", "1"));
    private static Map<MessageQueue, Long> offsetMap = new ConcurrentHashMap(256);
    private static Map<MessageQueue, Boolean> pullStatus = new ConcurrentHashMap(256);
    private static Map<Integer, Map<MessageQueue, Long>> pullEvent = new ConcurrentHashMap(256);
    private static int pullConsumerNum = Integer.parseInt(System.getProperty("pullConsumerNum", "8"));
    public static DefaultMQPullConsumer[] defaultMQPullConsumers = new DefaultMQPullConsumer[pullConsumerNum];
    private static AtomicLong rid = new AtomicLong();

    public static void main(String[] strArr) throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
        defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setProducerGroup("PID_LMQ_TEST");
        defaultMQProducer.setVipChannelEnabled(false);
        defaultMQProducer.setNamesrvAddr(namesrv);
        defaultMQProducer.start();
        for (int i = 0; i < pullConsumerNum; i++) {
            DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
            defaultMQPullConsumers[i] = defaultMQPullConsumer;
            defaultMQPullConsumer.setNamesrvAddr(namesrv);
            defaultMQPullConsumer.setVipChannelEnabled(false);
            defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + i);
            defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i);
            defaultMQPullConsumer.setRegisterTopics(new HashSet(Collections.singletonList(lmqTopic)));
            defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime);
            defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000);
            defaultMQPullConsumer.start();
        }
        Thread.sleep(RMQPopConsumer.POP_TIMEOUT);
        if (benchOffset) {
            doBenchOffset();
            return;
        }
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(consumerThreadNum, (ThreadFactory) new ThreadFactoryImpl("test"));
        for (int i2 = 0; i2 < consumerThreadNum; i2++) {
            int i3 = i2;
            scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
                try {
                    Map<MessageQueue, Long> map = pullEvent.get(Integer.valueOf(i3));
                    if (map == null) {
                        return;
                    }
                    for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
                        try {
                        } catch (Exception e) {
                            logger.error("pull broker msg error", e);
                        }
                        if (!Boolean.TRUE.equals(pullStatus.get(entry.getKey()))) {
                            doPull(map, entry.getKey(), entry.getValue());
                        }
                    }
                } catch (Exception e2) {
                    logger.error("exec doPull task error", e2);
                }
            }, 1L, 1L, TimeUnit.MILLISECONDS);
        }
        if (enableSub && lmqNum > 0 && StringUtils.isNotBlank(brokerName)) {
            for (int i4 = 0; i4 < lmqNum; i4++) {
                long incrementAndGet = rid.incrementAndGet();
                String str = LMQ_PREFIX + queuePrefix + LongMath.mod(incrementAndGet, lmqNum);
                MessageQueue messageQueue = new MessageQueue(str, brokerName, 0);
                int mod = IntMath.mod(str.hashCode(), consumerThreadNum);
                pullEvent.putIfAbsent(Integer.valueOf(mod), new ConcurrentHashMap());
                pullEvent.get(Integer.valueOf(mod)).put(messageQueue, Long.valueOf(incrementAndGet));
            }
        }
        Thread.sleep(5000L);
        doSend();
    }

    public static void doSend() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < size; i += 10) {
            sb.append("hello baby");
        }
        byte[] bytes = sb.toString().getBytes(StandardCharsets.UTF_8);
        String str = "pub";
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(sendThreadNum);
        for (int i2 = 0; i2 < sendThreadNum; i2++) {
            newFixedThreadPool.execute(() -> {
                while (true) {
                    if (StatUtil.isOverFlow(str, tps)) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        long incrementAndGet = rid.incrementAndGet();
                        Message message = new Message(lmqTopic, bytes);
                        String str2 = lmqTopic;
                        if (lmqNum > 0) {
                            str2 = LMQ_PREFIX + queuePrefix + (incrementAndGet % lmqNum);
                            message.putUserProperty("INNER_MULTI_DISPATCH", str2);
                        }
                        SendResult send = defaultMQProducer.send(message);
                        StatUtil.addInvoke(str, System.currentTimeMillis() - currentTimeMillis);
                        if (StatUtil.nowTps(str) < 10) {
                            logger.warn("pub: {} ", send.getMsgId());
                        }
                        if (enableSub && null != send.getMessageQueue()) {
                            MessageQueue messageQueue = new MessageQueue(str2, send.getMessageQueue().getBrokerName(), lmqNum > 0 ? 0 : send.getMessageQueue().getQueueId());
                            int mod = IntMath.mod(str2.hashCode(), consumerThreadNum);
                            pullEvent.putIfAbsent(Integer.valueOf(mod), new ConcurrentHashMap());
                            pullEvent.get(Integer.valueOf(mod)).put(messageQueue, Long.valueOf(incrementAndGet));
                        }
                    } catch (Exception e2) {
                        logger.error("", e2);
                        StatUtil.addInvoke(str, System.currentTimeMillis() - currentTimeMillis, false);
                    }
                }
            });
        }
    }

    public static void doPull(final Map<MessageQueue, Long> map, final MessageQueue messageQueue, final Long l) throws RemotingException, InterruptedException, MQClientException {
        if (!enableSub) {
            map.remove(messageQueue, l);
            pullStatus.remove(messageQueue);
            return;
        }
        DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[(int) (l.longValue() % pullConsumerNum)];
        Long l2 = offsetMap.get(messageQueue);
        if (l2 == null) {
            long currentTimeMillis = System.currentTimeMillis();
            l2 = Long.valueOf(defaultMQPullConsumer.maxOffset(messageQueue));
            StatUtil.addInvoke("maxOffset", System.currentTimeMillis() - currentTimeMillis);
            offsetMap.put(messageQueue, l2);
        }
        final long currentTimeMillis2 = System.currentTimeMillis();
        if (null != pullStatus.putIfAbsent(messageQueue, true)) {
            return;
        }
        defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", l2.longValue(), 32, new PullCallback() { // from class: org.apache.rocketmq.test.lmq.benchmark.BenchLmqStore.1
            public void onSuccess(PullResult pullResult) {
                StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - currentTimeMillis2);
                map.remove(messageQueue, l);
                BenchLmqStore.pullStatus.remove(messageQueue);
                BenchLmqStore.offsetMap.put(messageQueue, Long.valueOf(pullResult.getNextBeginOffset()));
                StatUtil.addInvoke("doPull", System.currentTimeMillis() - currentTimeMillis2);
                if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && BenchLmqStore.RETRY_NO_MATCHED_MSG) {
                    map.put(messageQueue, Long.valueOf(BenchLmqStore.rid.incrementAndGet()));
                }
                List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
                if (msgFoundList == null || msgFoundList.isEmpty()) {
                    StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - currentTimeMillis2);
                    return;
                }
                for (MessageExt messageExt : msgFoundList) {
                    StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp());
                    if (StatUtil.nowTps("sub") < 10) {
                        BenchLmqStore.logger.warn("sub: {}", messageExt.getMsgId());
                    }
                }
            }

            public void onException(Throwable th) {
                map.remove(messageQueue, l);
                BenchLmqStore.pullStatus.remove(messageQueue);
                BenchLmqStore.logger.error("", th);
                StatUtil.addInvoke("doPull", System.currentTimeMillis() - currentTimeMillis2, false);
            }
        });
    }

    public static void doBenchOffset() throws RemotingException, InterruptedException, MQClientException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(sendThreadNum);
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final String str = "benchOffset";
        HashMap brokerAddrs = ((BrokerData) defaultMQPullConsumers[0].getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().getTopicRouteInfoFromNameServer(lmqTopic, RMQPopConsumer.POP_TIMEOUT).getBrokerDatas().get(0)).getBrokerAddrs();
        if (brokerAddrs == null || brokerAddrs.isEmpty()) {
            return;
        }
        final String str2 = (String) brokerAddrs.get(0L);
        for (int i = 0; i < sendThreadNum; i++) {
            final int i2 = i;
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.rocketmq.test.lmq.benchmark.BenchLmqStore.2
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            if (StatUtil.isOverFlow(str, BenchLmqStore.tps)) {
                                Thread.sleep(100L);
                            }
                            long currentTimeMillis = System.currentTimeMillis();
                            long incrementAndGet = BenchLmqStore.rid.incrementAndGet();
                            DefaultMQPullConsumer defaultMQPullConsumer = BenchLmqStore.defaultMQPullConsumers[(Integer.MAX_VALUE & ((int) incrementAndGet)) % BenchLmqStore.defaultMQPullConsumers.length];
                            String str3 = BenchLmqStore.LMQ_PREFIX + BenchLmqStore.queuePrefix + (incrementAndGet % BenchLmqStore.benchOffsetNum);
                            String str4 = "%LMQ%GID_LMQ@@c" + i2 + "-" + (incrementAndGet % BenchLmqStore.benchOffsetNum);
                            concurrentHashMap.putIfAbsent(str3, 0L);
                            long longValue = ((Long) concurrentHashMap.get(str3)).longValue() + 1;
                            UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
                            updateConsumerOffsetRequestHeader.setTopic(str3);
                            updateConsumerOffsetRequestHeader.setConsumerGroup(str4);
                            updateConsumerOffsetRequestHeader.setQueueId(0);
                            updateConsumerOffsetRequestHeader.setCommitOffset(Long.valueOf(longValue));
                            defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().updateConsumerOffset(str2, updateConsumerOffsetRequestHeader, 1000L);
                            QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader = new QueryConsumerOffsetRequestHeader();
                            queryConsumerOffsetRequestHeader.setTopic(str3);
                            queryConsumerOffsetRequestHeader.setConsumerGroup(str4);
                            queryConsumerOffsetRequestHeader.setQueueId(0);
                            long queryConsumerOffset = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().queryConsumerOffset(str2, queryConsumerOffsetRequestHeader, 1000L);
                            concurrentHashMap.put(str3, Long.valueOf(queryConsumerOffset));
                            if (longValue != queryConsumerOffset) {
                                StatUtil.addInvoke("ErrorOffset", 1L);
                            }
                            StatUtil.addInvoke(str, System.currentTimeMillis() - currentTimeMillis);
                        } catch (Exception e) {
                            BenchLmqStore.logger.error("", e);
                        }
                    }
                }
            });
        }
    }
}
