/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.client.producer.batch;

import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.apache.rocketmq.test.util.RandomUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BatchSendIT
extends BaseConf {
    private static Logger logger = LoggerFactory.getLogger(TagMessageWith1ConsumerIT.class);
    private String topic = null;
    private Random random = new Random();

    @Before
    public void setUp() {
        this.topic = BatchSendIT.initTopic();
        logger.info(String.format("user topic[%s]!", this.topic));
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testBatchSend_ViewMessage() throws Exception {
        int i;
        Assert.assertTrue((boolean)(brokerController1.getMessageStore() instanceof DefaultMessageStore));
        Assert.assertTrue((boolean)(brokerController2.getMessageStore() instanceof DefaultMessageStore));
        ArrayList<Message> messageList = new ArrayList<Message>();
        int batchNum = 100;
        for (int i2 = 0; i2 < batchNum; ++i2) {
            messageList.add(new Message(this.topic, RandomUtils.getStringByUUID().getBytes()));
        }
        DefaultMQProducer producer = ProducerFactory.getRMQProducer((String)NAMESRV_ADDR);
        this.removeBatchUniqueId(producer);
        SendResult sendResult = producer.send(messageList);
        Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
        String[] offsetIds = sendResult.getOffsetMsgId().split(",");
        String[] msgIds = sendResult.getMsgId().split(",");
        Assert.assertEquals((long)messageList.size(), (long)offsetIds.length);
        Assert.assertEquals((long)messageList.size(), (long)msgIds.length);
        Thread.sleep(2000L);
        for (i = 0; i < 3; ++i) {
            producer.viewMessage(offsetIds[this.random.nextInt(batchNum)]);
        }
        for (i = 0; i < 3; ++i) {
            producer.viewMessage(this.topic, msgIds[this.random.nextInt(batchNum)]);
        }
    }

    @Test
    public void testBatchSend_SysInnerBatch() throws Exception {
        BatchSendIT.waitBrokerRegistered(NAMESRV_ADDR, CLUSTER_NAME, 3);
        String batchTopic = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.BatchCQ);
        Assert.assertEquals((Object)CQType.BatchCQ.toString(), ((TopicConfig)brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals((Object)CQType.BatchCQ.toString(), ((TopicConfig)brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals((Object)CQType.BatchCQ.toString(), ((TopicConfig)brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)-1L, (long)brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)-1L, (long)brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)-1L, (long)brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        DefaultMQProducer producer = ProducerFactory.getRMQProducer((String)NAMESRV_ADDR);
        MessageQueue messageQueue = (MessageQueue)producer.fetchPublishMessageQueues(batchTopic).iterator().next();
        int batchCount = 10;
        int batchNum = 10;
        for (int i = 0; i < batchCount; ++i) {
            ArrayList<Message> messageList = new ArrayList<Message>();
            for (int j = 0; j < batchNum; ++j) {
                messageList.add(new Message(batchTopic, RandomUtils.getStringByUUID().getBytes()));
            }
            SendResult sendResult = producer.send(messageList, messageQueue);
            Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
            Assert.assertEquals((long)messageQueue.getQueueId(), (long)sendResult.getMessageQueue().getQueueId());
            Assert.assertEquals((long)(i * batchNum), (long)sendResult.getQueueOffset());
            Assert.assertEquals((long)1L, (long)sendResult.getMsgId().split(",").length);
        }
        Thread.sleep(300L);
        DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer((String)NAMESRV_ADDR, (String)"group");
        PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", 5L, batchCount * batchNum);
        Assert.assertEquals((Object)PullStatus.FOUND, (Object)pullResult.getPullStatus());
        Assert.assertEquals((long)0L, (long)pullResult.getMinOffset());
        Assert.assertEquals((long)(batchCount * batchNum), (long)pullResult.getMaxOffset());
        Assert.assertEquals((long)(batchCount * batchNum), (long)pullResult.getMsgFoundList().size());
        MessageExt first = (MessageExt)pullResult.getMsgFoundList().get(0);
        for (int i = 0; i < pullResult.getMsgFoundList().size(); ++i) {
            MessageExt messageExt = (MessageExt)pullResult.getMsgFoundList().get(i);
            if (i % batchNum == 0) {
                first = messageExt;
            }
            Assert.assertEquals((long)i, (long)messageExt.getQueueOffset());
            Assert.assertEquals((Object)batchTopic, (Object)messageExt.getTopic());
            Assert.assertEquals((long)messageQueue.getQueueId(), (long)messageExt.getQueueId());
            Assert.assertEquals((Object)first.getBornHostString(), (Object)messageExt.getBornHostString());
            Assert.assertEquals((Object)first.getBornHostNameString(), (Object)messageExt.getBornHostNameString());
            Assert.assertEquals((long)first.getBornTimestamp(), (long)messageExt.getBornTimestamp());
            Assert.assertEquals((long)first.getStoreTimestamp(), (long)messageExt.getStoreTimestamp());
        }
    }

    @Test
    public void testBatchSend_SysOuterBatch() throws Exception {
        Assert.assertTrue((boolean)(brokerController1.getMessageStore() instanceof DefaultMessageStore));
        Assert.assertTrue((boolean)(brokerController2.getMessageStore() instanceof DefaultMessageStore));
        Assert.assertTrue((boolean)(brokerController3.getMessageStore() instanceof DefaultMessageStore));
        String batchTopic = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)8L, (long)((TopicConfig)brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic)).getReadQueueNums());
        Assert.assertEquals((long)0L, (long)brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        Assert.assertEquals((long)0L, (long)brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
        DefaultMQProducer producer = ProducerFactory.getRMQProducer((String)NAMESRV_ADDR);
        MessageQueue messageQueue = (MessageQueue)producer.fetchPublishMessageQueues(batchTopic).iterator().next();
        int batchCount = 10;
        int batchNum = 10;
        for (int i = 0; i < batchCount; ++i) {
            ArrayList<Message> messageList = new ArrayList<Message>();
            for (int j = 0; j < batchNum; ++j) {
                messageList.add(new Message(batchTopic, RandomUtils.getStringByUUID().getBytes()));
            }
            SendResult sendResult = producer.send(messageList, messageQueue);
            Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
            Assert.assertEquals((long)messageQueue.getQueueId(), (long)sendResult.getMessageQueue().getQueueId());
            Assert.assertEquals((long)(i * batchNum), (long)sendResult.getQueueOffset());
            Assert.assertEquals((long)10L, (long)sendResult.getMsgId().split(",").length);
        }
        Thread.sleep(300L);
        DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer((String)NAMESRV_ADDR, (String)"group");
        long startOffset = 5L;
        PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", startOffset, batchCount * batchNum);
        Assert.assertEquals((Object)PullStatus.FOUND, (Object)pullResult.getPullStatus());
        Assert.assertEquals((long)0L, (long)pullResult.getMinOffset());
        Assert.assertEquals((long)(batchCount * batchNum), (long)pullResult.getMaxOffset());
        Assert.assertEquals((long)((long)(batchCount * batchNum) - startOffset), (long)pullResult.getMsgFoundList().size());
        for (int i = 0; i < pullResult.getMsgFoundList().size(); ++i) {
            MessageExt messageExt = (MessageExt)pullResult.getMsgFoundList().get(i);
            Assert.assertEquals((long)((long)i + startOffset), (long)messageExt.getQueueOffset());
            Assert.assertEquals((Object)batchTopic, (Object)messageExt.getTopic());
            Assert.assertEquals((long)messageQueue.getQueueId(), (long)messageExt.getQueueId());
        }
    }

    @Test
    public void testBatchSend_CheckProperties() throws Exception {
        ArrayList<Message> messageList = new ArrayList<Message>();
        Message message = new Message();
        message.setTopic(this.topic);
        message.setKeys("keys123");
        message.setTags("tags123");
        message.setWaitStoreMsgOK(false);
        message.setBuyerId("buyerid123");
        message.setFlag(123);
        message.setBody("body".getBytes());
        messageList.add(message);
        DefaultMQProducer producer = ProducerFactory.getRMQProducer((String)NAMESRV_ADDR);
        this.removeBatchUniqueId(producer);
        SendResult sendResult = producer.send(messageList);
        Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
        String[] offsetIds = sendResult.getOffsetMsgId().split(",");
        String[] msgIds = sendResult.getMsgId().split(",");
        Assert.assertEquals((long)messageList.size(), (long)offsetIds.length);
        Assert.assertEquals((long)messageList.size(), (long)msgIds.length);
        Thread.sleep(2000L);
        MessageExt messageByOffset = producer.viewMessage(offsetIds[0]);
        MessageExt messageByMsgId = producer.viewMessage(this.topic, msgIds[0]);
        Assert.assertEquals((Object)message.getTopic(), (Object)messageByMsgId.getTopic());
        Assert.assertEquals((Object)message.getTopic(), (Object)messageByOffset.getTopic());
        Assert.assertEquals((Object)message.getKeys(), (Object)messageByOffset.getKeys());
        Assert.assertEquals((Object)message.getKeys(), (Object)messageByMsgId.getKeys());
        Assert.assertEquals((Object)message.getTags(), (Object)messageByOffset.getTags());
        Assert.assertEquals((Object)message.getTags(), (Object)messageByMsgId.getTags());
        Assert.assertEquals((Object)message.isWaitStoreMsgOK(), (Object)messageByOffset.isWaitStoreMsgOK());
        Assert.assertEquals((Object)message.isWaitStoreMsgOK(), (Object)messageByMsgId.isWaitStoreMsgOK());
        Assert.assertEquals((Object)message.getBuyerId(), (Object)messageByOffset.getBuyerId());
        Assert.assertEquals((Object)message.getBuyerId(), (Object)messageByMsgId.getBuyerId());
        Assert.assertEquals((long)message.getFlag(), (long)messageByOffset.getFlag());
        Assert.assertEquals((long)message.getFlag(), (long)messageByMsgId.getFlag());
    }

    private void removeBatchUniqueId(DefaultMQProducer producer) {
        producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook(){

            public String hookName() {
                return null;
            }

            public void sendMessageBefore(SendMessageContext context) {
                MessageBatch messageBatch = (MessageBatch)context.getMessage();
                if (messageBatch.getProperty("UNIQ_KEY") != null) {
                    messageBatch.getProperties().remove("UNIQ_KEY");
                }
            }

            public void sendMessageAfter(SendMessageContext context) {
            }
        });
    }
}

