package org.apache.rocketmq.test.statictopic;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;

@FixMethodOrder
/* loaded from: input_file:org/apache/rocketmq/test/statictopic/StaticTopicIT.class */
public class StaticTopicIT extends BaseConf {
    private static Logger logger = Logger.getLogger(StaticTopicIT.class);
    private DefaultMQAdminExt defaultMQAdminExt;

    @Before
    public void setUp() throws Exception {
        System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
        this.defaultMQAdminExt = getAdmin(nsAddr);
        waitBrokerRegistered(nsAddr, clusterName, brokerNum);
        this.defaultMQAdminExt.start();
    }

    @Test
    public void testCommandsWithCluster() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        RMQNormalConsumer consumer = getConsumer(nsAddr, str, "*", new RMQNormalListener());
        MQAdminTestUtils.createStaticTopicWithCommand(str, 10, (Set) null, clusterName, nsAddr);
        sendMessagesAndCheck(producer, getBrokers(), str, 10, 100, 0L);
        consumeMessagesAndCheck(producer, consumer, str, 10, 100, 0, 1);
        MQAdminTestUtils.remappingStaticTopicWithCommand(str, (Set) null, clusterName, nsAddr);
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, getBrokers(), str, 10, 100, 100L);
    }

    @Test
    public void testCommandsWithBrokers() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        RMQNormalConsumer consumer = getConsumer(nsAddr, str, "*", new RMQNormalListener());
        ImmutableSet of = ImmutableSet.of(broker1Name);
        MQAdminTestUtils.createStaticTopicWithCommand(str, 10, of, (String) null, nsAddr);
        sendMessagesAndCheck(producer, of, str, 10, 100, 0L);
        consumeMessagesAndCheck(producer, consumer, str, 10, 100, 0, 1);
        ImmutableSet of2 = ImmutableSet.of(broker2Name);
        MQAdminTestUtils.remappingStaticTopicWithCommand(str, of2, (String) null, nsAddr);
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of2, str, 10, 100, 10000L);
        consumeMessagesAndCheck(producer, consumer, str, 10, 100, 0, 2);
    }

    @Test
    public void testNoTargetBrokers() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        HashSet hashSet = new HashSet();
        hashSet.add(broker1Name);
        MQAdminTestUtils.createStaticTopic(str, 10, hashSet, this.defaultMQAdminExt);
        Map examineTopicConfigAll = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        Assert.assertEquals(brokerNum, examineTopicConfigAll.size());
        TopicQueueMappingUtils.checkNameEpochNumConsistence(str, examineTopicConfigAll);
        Assert.assertEquals(10, TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(examineTopicConfigAll.values())), false, true).size());
        TopicConfigAndQueueMapping topicConfigAndQueueMapping = (TopicConfigAndQueueMapping) examineTopicConfigAll.get(broker2Name);
        Assert.assertEquals(0L, topicConfigAndQueueMapping.getWriteQueueNums());
        Assert.assertEquals(0L, topicConfigAndQueueMapping.getReadQueueNums());
        Assert.assertEquals(0L, topicConfigAndQueueMapping.getMappingDetail().getHostedQueues().size());
        HashSet hashSet2 = new HashSet();
        hashSet2.add(broker2Name);
        MQAdminTestUtils.remappingStaticTopic(str, hashSet2, this.defaultMQAdminExt);
        Map examineTopicConfigAll2 = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        Assert.assertEquals(brokerNum, examineTopicConfigAll2.size());
        TopicQueueMappingUtils.checkNameEpochNumConsistence(str, examineTopicConfigAll2);
        Assert.assertEquals(10, TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(examineTopicConfigAll2.values())), false, true).size());
    }

    private void sendMessagesAndCheck(RMQNormalProducer rMQNormalProducer, Set<String> set, String str, int i, int i2, long j) throws Exception {
        ClientMetadata brokerAndTopicMetadata = MQAdminUtils.getBrokerAndTopicMetadata(str, this.defaultMQAdminExt);
        List<MessageQueue> messageQueue = rMQNormalProducer.getMessageQueue();
        Assert.assertEquals(i, messageQueue.size());
        for (int i3 = 0; i3 < i; i3++) {
            MessageQueue messageQueue2 = (MessageQueue) messageQueue.get(i3);
            Assert.assertEquals(str, messageQueue2.getTopic());
            Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName("__global__"), messageQueue2.getBrokerName());
            Assert.assertEquals(i3, messageQueue2.getQueueId());
            Assert.assertTrue(set.contains(brokerAndTopicMetadata.getBrokerNameFromMessageQueue(messageQueue2)));
        }
        Iterator it = messageQueue.iterator();
        while (it.hasNext()) {
            rMQNormalProducer.send(i2, (MessageQueue) it.next());
        }
        Assert.assertEquals(0L, rMQNormalProducer.getSendErrorMsg().size());
        Thread.sleep(100L);
        for (MessageQueue messageQueue3 : messageQueue) {
            Assert.assertEquals(0L, this.defaultMQAdminExt.minOffset(messageQueue3));
            Assert.assertEquals(i2 + j, this.defaultMQAdminExt.maxOffset(messageQueue3));
        }
        TopicStatsTable examineTopicStats = this.defaultMQAdminExt.examineTopicStats(str);
        for (MessageQueue messageQueue4 : messageQueue) {
            Assert.assertEquals(0L, ((TopicOffset) examineTopicStats.getOffsetTable().get(messageQueue4)).getMinOffset());
            Assert.assertEquals(i2 + j, ((TopicOffset) examineTopicStats.getOffsetTable().get(messageQueue4)).getMaxOffset());
        }
    }

    private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> collection) {
        HashMap hashMap = new HashMap();
        Iterator<Object> it = collection.iterator();
        while (it.hasNext()) {
            MessageExt messageExt = (MessageExt) it.next();
            if (!hashMap.containsKey(Integer.valueOf(messageExt.getQueueId()))) {
                hashMap.put(Integer.valueOf(messageExt.getQueueId()), new ArrayList());
            }
            ((List) hashMap.get(Integer.valueOf(messageExt.getQueueId()))).add(messageExt);
        }
        Iterator it2 = hashMap.values().iterator();
        while (it2.hasNext()) {
            Collections.sort((List) it2.next(), new Comparator<MessageExt>() { // from class: org.apache.rocketmq.test.statictopic.StaticTopicIT.1
                @Override // java.util.Comparator
                public int compare(MessageExt messageExt2, MessageExt messageExt3) {
                    return (int) (messageExt2.getQueueOffset() - messageExt3.getQueueOffset());
                }
            });
        }
        return hashMap;
    }

    private void consumeMessagesAndCheck(RMQNormalProducer rMQNormalProducer, RMQNormalConsumer rMQNormalConsumer, String str, int i, int i2, int i3, int i4) {
        rMQNormalConsumer.getListener().waitForMessageConsume(rMQNormalProducer.getAllMsgBody(), 30000);
        Truth.assertThat(VerifyUtils.getFilterdMessage(rMQNormalProducer.getAllMsgBody(), rMQNormalConsumer.getListener().getAllMsgBody())).containsExactlyElementsIn(rMQNormalProducer.getAllMsgBody());
        Map<Integer, List<MessageExt>> computeMessageByQueue = computeMessageByQueue(rMQNormalConsumer.getListener().getAllOriginMsg());
        Assert.assertEquals(i, computeMessageByQueue.size());
        for (int i5 = 0; i5 < i; i5++) {
            List<MessageExt> list = computeMessageByQueue.get(Integer.valueOf(i5));
            int i6 = i2 * i4;
            Assert.assertEquals(i6, list.size());
            for (int i7 = 0; i7 < i6; i7++) {
                MessageExt messageExt = list.get(i7);
                Assert.assertEquals(str, messageExt.getTopic());
                Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName("__global__"), messageExt.getBrokerName());
                Assert.assertEquals(i5, messageExt.getQueueId());
                Assert.assertEquals((i7 % i2) + ((i3 + (i7 / i2)) * 10000), messageExt.getQueueOffset());
            }
        }
    }

    @Test
    public void testCreateProduceConsumeStaticTopic() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        RMQNormalConsumer consumer = getConsumer(nsAddr, str, "*", new RMQNormalListener());
        Map createStaticTopic = MQAdminTestUtils.createStaticTopic(str, 10, getBrokers(), this.defaultMQAdminExt);
        Map examineTopicConfigAll = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        Assert.assertEquals(brokerNum, examineTopicConfigAll.size());
        for (Map.Entry entry : examineTopicConfigAll.entrySet()) {
            String str2 = (String) entry.getKey();
            TopicConfigAndQueueMapping topicConfigAndQueueMapping = (TopicConfigAndQueueMapping) entry.getValue();
            TopicConfigAndQueueMapping topicConfigAndQueueMapping2 = (TopicConfigAndQueueMapping) createStaticTopic.get(str2);
            Assert.assertNotNull(topicConfigAndQueueMapping2);
            Assert.assertEquals(topicConfigAndQueueMapping, topicConfigAndQueueMapping2);
        }
        TopicQueueMappingUtils.checkNameEpochNumConsistence(str, examineTopicConfigAll);
        Assert.assertEquals(10, TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(examineTopicConfigAll.values())), false, true).size());
        sendMessagesAndCheck(producer, getBrokers(), str, 10, 100, 0L);
        consumeMessagesAndCheck(producer, consumer, str, 10, 100, 0, 1);
    }

    @Test
    public void testRemappingProduceConsumeStaticTopic() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        RMQNormalConsumer consumer = getConsumer(nsAddr, str, "*", new RMQNormalListener());
        ImmutableSet of = ImmutableSet.of(broker1Name);
        MQAdminTestUtils.createStaticTopic(str, 1, of, this.defaultMQAdminExt);
        sendMessagesAndCheck(producer, of, str, 1, 100, 0L);
        consumeMessagesAndCheck(producer, consumer, str, 1, 100, 0, 1);
        ImmutableSet of2 = ImmutableSet.of(broker2Name);
        MQAdminTestUtils.remappingStaticTopic(str, of2, this.defaultMQAdminExt);
        Map examineTopicConfigAll = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        TopicQueueMappingUtils.checkNameEpochNumConsistence(str, examineTopicConfigAll);
        Map checkAndBuildMappingItems = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(examineTopicConfigAll.values())), false, true);
        Assert.assertEquals(1, checkAndBuildMappingItems.size());
        for (TopicQueueMappingOne topicQueueMappingOne : checkAndBuildMappingItems.values()) {
            Assert.assertEquals(broker2Name, topicQueueMappingOne.getBname());
            Assert.assertEquals(10000L, ((LogicQueueMappingItem) topicQueueMappingOne.getItems().get(topicQueueMappingOne.getItems().size() - 1)).getLogicOffset());
        }
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of2, str, 1, 100, 10000L);
        consumeMessagesAndCheck(producer, consumer, str, 1, 100, 0, 2);
    }

    @Test
    public void testDoubleReadCheckConsumerOffset() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        String initConsumerGroup = initConsumerGroup();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        RMQNormalConsumer consumer = getConsumer(nsAddr, initConsumerGroup, str, "*", (AbstractListener) new RMQNormalListener());
        long currentTimeMillis = System.currentTimeMillis();
        ImmutableSet of = ImmutableSet.of(broker1Name);
        MQAdminTestUtils.createStaticTopic(str, 10, of, this.defaultMQAdminExt);
        sendMessagesAndCheck(producer, of, str, 10, 100, 0L);
        consumeMessagesAndCheck(producer, consumer, str, 10, 100, 0, 1);
        producer.shutdown();
        consumer.shutdown();
        RMQNormalProducer producer2 = getProducer(nsAddr, str);
        ConsumeStats examineConsumeStats = this.defaultMQAdminExt.examineConsumeStats(initConsumerGroup);
        Iterator it = producer2.getMessageQueue().iterator();
        while (it.hasNext()) {
            OffsetWrapper offsetWrapper = (OffsetWrapper) examineConsumeStats.getOffsetTable().get((MessageQueue) it.next());
            Assert.assertNotNull(offsetWrapper);
            Assert.assertEquals(100, offsetWrapper.getBrokerOffset());
            Assert.assertEquals(100, offsetWrapper.getConsumerOffset());
            Assert.assertTrue(offsetWrapper.getLastTimestamp() > currentTimeMillis);
        }
        ImmutableList of2 = ImmutableList.of(broker2Name, broker3Name, broker1Name);
        for (int i = 0; i < of2.size(); i++) {
            ImmutableSet of3 = ImmutableSet.of(of2.get(i));
            MQAdminTestUtils.remappingStaticTopic(str, of3, this.defaultMQAdminExt);
            Thread.sleep(500L);
            sendMessagesAndCheck(producer2, of3, str, 10, 100, (i + 1) * 10000);
        }
        ConsumeStats examineConsumeStats2 = this.defaultMQAdminExt.examineConsumeStats(initConsumerGroup);
        Iterator it2 = producer2.getMessageQueue().iterator();
        while (it2.hasNext()) {
            OffsetWrapper offsetWrapper2 = (OffsetWrapper) examineConsumeStats2.getOffsetTable().get((MessageQueue) it2.next());
            Assert.assertNotNull(offsetWrapper2);
            Assert.assertEquals(100 + (of2.size() * 10000), offsetWrapper2.getBrokerOffset());
            Assert.assertEquals(100, offsetWrapper2.getConsumerOffset());
            Assert.assertTrue(offsetWrapper2.getLastTimestamp() > currentTimeMillis);
        }
        consumeMessagesAndCheck(producer2, getConsumer(nsAddr, initConsumerGroup, str, "*", (AbstractListener) new RMQNormalListener()), str, 10, 100, 1, of2.size());
    }

    @Test
    public void testRemappingAndClear() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        ImmutableSet of = ImmutableSet.of(broker1Name);
        MQAdminTestUtils.createStaticTopic(str, 10, of, this.defaultMQAdminExt);
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of, str, 10, 100, 0L);
        ImmutableSet of2 = ImmutableSet.of(broker2Name);
        MQAdminTestUtils.remappingStaticTopic(str, of2, this.defaultMQAdminExt);
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of2, str, 10, 100, 10000L);
        ImmutableSet of3 = ImmutableSet.of(broker3Name);
        MQAdminTestUtils.remappingStaticTopic(str, of3, this.defaultMQAdminExt);
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of3, str, 10, 100, 20000L);
        for (int i = 0; i < 10; i++) {
            Iterator<BrokerController> it = brokerControllerList.iterator();
            while (it.hasNext()) {
                it.next().getTopicQueueMappingCleanService().wakeup();
            }
            Thread.sleep(100L);
        }
        Map examineTopicConfigAll = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        Assert.assertEquals(brokerNum, examineTopicConfigAll.size());
        TopicConfigAndQueueMapping topicConfigAndQueueMapping = (TopicConfigAndQueueMapping) examineTopicConfigAll.get(broker1Name);
        TopicConfigAndQueueMapping topicConfigAndQueueMapping2 = (TopicConfigAndQueueMapping) examineTopicConfigAll.get(broker2Name);
        TopicConfigAndQueueMapping topicConfigAndQueueMapping3 = (TopicConfigAndQueueMapping) examineTopicConfigAll.get(broker3Name);
        Assert.assertEquals(0L, topicConfigAndQueueMapping.getMappingDetail().getHostedQueues().size());
        Assert.assertEquals(10, topicConfigAndQueueMapping2.getMappingDetail().getHostedQueues().size());
        Assert.assertEquals(10, topicConfigAndQueueMapping3.getMappingDetail().getHostedQueues().size());
        HashSet hashSet = new HashSet(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet());
        hashSet.remove(str);
        brokerController1.getMessageStore().cleanUnusedTopic(hashSet);
        brokerController2.getMessageStore().cleanUnusedTopic(hashSet);
        for (int i2 = 0; i2 < 10; i2++) {
            Iterator<BrokerController> it2 = brokerControllerList.iterator();
            while (it2.hasNext()) {
                it2.next().getTopicQueueMappingCleanService().wakeup();
            }
            Thread.sleep(100L);
        }
        Map examineTopicConfigAll2 = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        Assert.assertEquals(brokerNum, examineTopicConfigAll2.size());
        TopicConfigAndQueueMapping topicConfigAndQueueMapping4 = (TopicConfigAndQueueMapping) examineTopicConfigAll2.get(broker1Name);
        TopicConfigAndQueueMapping topicConfigAndQueueMapping5 = (TopicConfigAndQueueMapping) examineTopicConfigAll2.get(broker2Name);
        TopicConfigAndQueueMapping topicConfigAndQueueMapping6 = (TopicConfigAndQueueMapping) examineTopicConfigAll2.get(broker3Name);
        Assert.assertEquals(0L, topicConfigAndQueueMapping4.getMappingDetail().getHostedQueues().size());
        Assert.assertEquals(10, topicConfigAndQueueMapping5.getMappingDetail().getHostedQueues().size());
        Assert.assertEquals(10, topicConfigAndQueueMapping6.getMappingDetail().getHostedQueues().size());
        Iterator it3 = topicConfigAndQueueMapping4.getMappingDetail().getHostedQueues().values().iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(3L, ((List) it3.next()).size());
        }
        Iterator it4 = topicConfigAndQueueMapping6.getMappingDetail().getHostedQueues().values().iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(1L, ((List) it4.next()).size());
        }
    }

    @Test
    public void testRemappingWithNegativeLogicOffset() throws Exception {
        String str = "static" + MQRandomUtils.getRandomTopic();
        RMQNormalProducer producer = getProducer(nsAddr, str);
        ImmutableSet of = ImmutableSet.of(broker1Name);
        MQAdminTestUtils.createStaticTopic(str, 10, of, this.defaultMQAdminExt);
        sendMessagesAndCheck(producer, of, str, 10, 100, 0L);
        ImmutableSet of2 = ImmutableSet.of(broker2Name);
        MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(str, of2, this.defaultMQAdminExt);
        Map examineTopicConfigAll = MQAdminUtils.examineTopicConfigAll(str, this.defaultMQAdminExt);
        TopicQueueMappingUtils.checkNameEpochNumConsistence(str, examineTopicConfigAll);
        Map checkAndBuildMappingItems = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(examineTopicConfigAll.values())), false, true);
        Assert.assertEquals(10, checkAndBuildMappingItems.size());
        for (TopicQueueMappingOne topicQueueMappingOne : checkAndBuildMappingItems.values()) {
            Assert.assertEquals(broker2Name, topicQueueMappingOne.getBname());
            Assert.assertEquals(-1L, ((LogicQueueMappingItem) topicQueueMappingOne.getItems().get(topicQueueMappingOne.getItems().size() - 1)).getLogicOffset());
        }
        Thread.sleep(500L);
        sendMessagesAndCheck(producer, of2, str, 10, 100, 0L);
    }

    @After
    public void tearDown() {
        System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
        BaseConf.shutdown();
    }
}
