package org.apache.rocketmq.test.client.producer.batch;

import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.apache.rocketmq.test.util.RandomUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/client/producer/batch/BatchSendIT.class */
public class BatchSendIT extends BaseConf {
    private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class);
    private String topic = null;
    private Random random = new Random();

    @Before
    public void setUp() {
        this.topic = initTopic();
        logger.info(String.format("user topic[%s]!", this.topic));
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testBatchSend_ViewMessage() throws Exception {
        Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore);
        Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(new Message(this.topic, RandomUtils.getStringByUUID().getBytes()));
        }
        DefaultMQProducer rMQProducer = ProducerFactory.getRMQProducer(nsAddr);
        removeBatchUniqueId(rMQProducer);
        SendResult send = rMQProducer.send(arrayList);
        Assert.assertEquals(SendStatus.SEND_OK, send.getSendStatus());
        String[] split = send.getOffsetMsgId().split(",");
        String[] split2 = send.getMsgId().split(",");
        Assert.assertEquals(arrayList.size(), split.length);
        Assert.assertEquals(arrayList.size(), split2.length);
        Thread.sleep(2000L);
        for (int i2 = 0; i2 < 3; i2++) {
            rMQProducer.viewMessage(split[this.random.nextInt(100)]);
        }
        for (int i3 = 0; i3 < 3; i3++) {
            rMQProducer.viewMessage(this.topic, split2[this.random.nextInt(100)]);
        }
    }

    @Test
    public void testBatchSend_SysInnerBatch() throws Exception {
        waitBrokerRegistered(nsAddr, clusterName, brokerNum);
        String uuid = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(uuid, nsAddr, clusterName, CQType.BatchCQ);
        Assert.assertEquals(CQType.BatchCQ.toString(), ((TopicConfig) brokerController1.getTopicConfigManager().getTopicConfigTable().get(uuid)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals(CQType.BatchCQ.toString(), ((TopicConfig) brokerController2.getTopicConfigManager().getTopicConfigTable().get(uuid)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals(CQType.BatchCQ.toString(), ((TopicConfig) brokerController3.getTopicConfigManager().getTopicConfigTable().get(uuid)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals(8L, ((TopicConfig) brokerController1.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(8L, ((TopicConfig) brokerController2.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(8L, ((TopicConfig) brokerController3.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(-1L, brokerController1.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(-1L, brokerController2.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(-1L, brokerController3.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController1.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController2.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController3.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        DefaultMQProducer rMQProducer = ProducerFactory.getRMQProducer(nsAddr);
        MessageQueue messageQueue = (MessageQueue) rMQProducer.fetchPublishMessageQueues(uuid).iterator().next();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 10; i2++) {
                arrayList.add(new Message(uuid, RandomUtils.getStringByUUID().getBytes()));
            }
            SendResult send = rMQProducer.send(arrayList, messageQueue);
            Assert.assertEquals(SendStatus.SEND_OK, send.getSendStatus());
            Assert.assertEquals(messageQueue.getQueueId(), send.getMessageQueue().getQueueId());
            Assert.assertEquals(i * 10, send.getQueueOffset());
            Assert.assertEquals(1L, send.getMsgId().split(",").length);
        }
        Thread.sleep(300L);
        PullResult pullBlockIfNotFound = ConsumerFactory.getRMQPullConsumer(nsAddr, "group").pullBlockIfNotFound(messageQueue, "*", 5L, 10 * 10);
        Assert.assertEquals(PullStatus.FOUND, pullBlockIfNotFound.getPullStatus());
        Assert.assertEquals(0L, pullBlockIfNotFound.getMinOffset());
        Assert.assertEquals(10 * 10, pullBlockIfNotFound.getMaxOffset());
        Assert.assertEquals(10 * 10, pullBlockIfNotFound.getMsgFoundList().size());
        MessageExt messageExt = (MessageExt) pullBlockIfNotFound.getMsgFoundList().get(0);
        for (int i3 = 0; i3 < pullBlockIfNotFound.getMsgFoundList().size(); i3++) {
            MessageExt messageExt2 = (MessageExt) pullBlockIfNotFound.getMsgFoundList().get(i3);
            if (i3 % 10 == 0) {
                messageExt = messageExt2;
            }
            Assert.assertEquals(i3, messageExt2.getQueueOffset());
            Assert.assertEquals(uuid, messageExt2.getTopic());
            Assert.assertEquals(messageQueue.getQueueId(), messageExt2.getQueueId());
            Assert.assertEquals(messageExt.getBornHostString(), messageExt2.getBornHostString());
            Assert.assertEquals(messageExt.getBornHostNameString(), messageExt2.getBornHostNameString());
            Assert.assertEquals(messageExt.getBornTimestamp(), messageExt2.getBornTimestamp());
            Assert.assertEquals(messageExt.getStoreTimestamp(), messageExt2.getStoreTimestamp());
        }
    }

    @Test
    public void testBatchSend_SysOuterBatch() throws Exception {
        Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore);
        Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore);
        Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore);
        String uuid = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(uuid, nsAddr, clusterName, CQType.SimpleCQ);
        Assert.assertEquals(8L, ((TopicConfig) brokerController1.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(8L, ((TopicConfig) brokerController2.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(8L, ((TopicConfig) brokerController3.getTopicConfigManager().getTopicConfigTable().get(uuid)).getReadQueueNums());
        Assert.assertEquals(0L, brokerController1.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController2.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController3.getMessageStore().getMinOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController1.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController2.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        Assert.assertEquals(0L, brokerController3.getMessageStore().getMaxOffsetInQueue(uuid, 0));
        DefaultMQProducer rMQProducer = ProducerFactory.getRMQProducer(nsAddr);
        MessageQueue messageQueue = (MessageQueue) rMQProducer.fetchPublishMessageQueues(uuid).iterator().next();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 10; i2++) {
                arrayList.add(new Message(uuid, RandomUtils.getStringByUUID().getBytes()));
            }
            SendResult send = rMQProducer.send(arrayList, messageQueue);
            Assert.assertEquals(SendStatus.SEND_OK, send.getSendStatus());
            Assert.assertEquals(messageQueue.getQueueId(), send.getMessageQueue().getQueueId());
            Assert.assertEquals(i * 10, send.getQueueOffset());
            Assert.assertEquals(10L, send.getMsgId().split(",").length);
        }
        Thread.sleep(300L);
        PullResult pullBlockIfNotFound = ConsumerFactory.getRMQPullConsumer(nsAddr, "group").pullBlockIfNotFound(messageQueue, "*", 5L, 10 * 10);
        Assert.assertEquals(PullStatus.FOUND, pullBlockIfNotFound.getPullStatus());
        Assert.assertEquals(0L, pullBlockIfNotFound.getMinOffset());
        Assert.assertEquals(10 * 10, pullBlockIfNotFound.getMaxOffset());
        Assert.assertEquals((10 * 10) - 5, pullBlockIfNotFound.getMsgFoundList().size());
        for (int i3 = 0; i3 < pullBlockIfNotFound.getMsgFoundList().size(); i3++) {
            MessageExt messageExt = (MessageExt) pullBlockIfNotFound.getMsgFoundList().get(i3);
            Assert.assertEquals(i3 + 5, messageExt.getQueueOffset());
            Assert.assertEquals(uuid, messageExt.getTopic());
            Assert.assertEquals(messageQueue.getQueueId(), messageExt.getQueueId());
        }
    }

    @Test
    public void testBatchSend_CheckProperties() throws Exception {
        ArrayList arrayList = new ArrayList();
        Message message = new Message();
        message.setTopic(this.topic);
        message.setKeys("keys123");
        message.setTags("tags123");
        message.setWaitStoreMsgOK(false);
        message.setBuyerId("buyerid123");
        message.setFlag(123);
        message.setBody("body".getBytes());
        arrayList.add(message);
        DefaultMQProducer rMQProducer = ProducerFactory.getRMQProducer(nsAddr);
        removeBatchUniqueId(rMQProducer);
        SendResult send = rMQProducer.send(arrayList);
        Assert.assertEquals(SendStatus.SEND_OK, send.getSendStatus());
        String[] split = send.getOffsetMsgId().split(",");
        String[] split2 = send.getMsgId().split(",");
        Assert.assertEquals(arrayList.size(), split.length);
        Assert.assertEquals(arrayList.size(), split2.length);
        Thread.sleep(2000L);
        MessageExt viewMessage = rMQProducer.viewMessage(split[0]);
        MessageExt viewMessage2 = rMQProducer.viewMessage(this.topic, split2[0]);
        System.out.println(viewMessage);
        System.out.println(viewMessage2);
        Assert.assertEquals(message.getTopic(), viewMessage2.getTopic());
        Assert.assertEquals(message.getTopic(), viewMessage.getTopic());
        Assert.assertEquals(message.getKeys(), viewMessage.getKeys());
        Assert.assertEquals(message.getKeys(), viewMessage2.getKeys());
        Assert.assertEquals(message.getTags(), viewMessage.getTags());
        Assert.assertEquals(message.getTags(), viewMessage2.getTags());
        Assert.assertEquals(Boolean.valueOf(message.isWaitStoreMsgOK()), Boolean.valueOf(viewMessage.isWaitStoreMsgOK()));
        Assert.assertEquals(Boolean.valueOf(message.isWaitStoreMsgOK()), Boolean.valueOf(viewMessage2.isWaitStoreMsgOK()));
        Assert.assertEquals(message.getBuyerId(), viewMessage.getBuyerId());
        Assert.assertEquals(message.getBuyerId(), viewMessage2.getBuyerId());
        Assert.assertEquals(message.getFlag(), viewMessage.getFlag());
        Assert.assertEquals(message.getFlag(), viewMessage2.getFlag());
    }

    private void removeBatchUniqueId(DefaultMQProducer defaultMQProducer) {
        defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() { // from class: org.apache.rocketmq.test.client.producer.batch.BatchSendIT.1
            public String hookName() {
                return null;
            }

            public void sendMessageBefore(SendMessageContext sendMessageContext) {
                MessageBatch message = sendMessageContext.getMessage();
                if (message.getProperty("UNIQ_KEY") != null) {
                    message.getProperties().remove("UNIQ_KEY");
                }
            }

            public void sendMessageAfter(SendMessageContext sendMessageContext) {
            }
        });
    }
}
