/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.dledger;

import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
import org.junit.Test;

public class DLedgerProduceAndConsumeIT {
    public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
        BrokerConfig brokerConfig = new BrokerConfig();
        brokerConfig.setBrokerClusterName(cluster);
        brokerConfig.setBrokerName(brokerName);
        brokerConfig.setBrokerIP1("127.0.0.1");
        brokerConfig.setNamesrvAddr(BaseConf.NAMESRV_ADDR);
        return brokerConfig;
    }

    public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) {
        MessageStoreConfig storeConfig = new MessageStoreConfig();
        String baseDir = IntegrationTestBase.createBaseDir();
        storeConfig.setStorePathRootDir(baseDir);
        storeConfig.setStorePathCommitLog(baseDir + "_" + "commitlog");
        storeConfig.setHaListenPort(0);
        storeConfig.setMappedFileSizeCommitLog(0xA00000);
        storeConfig.setEnableDLegerCommitLog(true);
        storeConfig.setdLegerGroup(brokerName);
        storeConfig.setdLegerSelfId(selfId);
        storeConfig.setdLegerPeers(peers);
        return storeConfig;
    }

    @Test
    public void testProduceAndConsume() throws Exception {
        String cluster = UUID.randomUUID().toString();
        String brokerName = UUID.randomUUID().toString();
        String selfId = "n0";
        String peers = String.format("n0-localhost:%d", 0);
        BrokerConfig brokerConfig = this.buildBrokerConfig(cluster, brokerName);
        MessageStoreConfig storeConfig = this.buildStoreConfig(brokerName, peers, selfId);
        BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
        BaseConf.waitBrokerRegistered(BaseConf.NAMESRV_ADDR, brokerConfig.getBrokerName(), 1);
        Assert.assertEquals((Object)BrokerRole.SYNC_MASTER, (Object)storeConfig.getBrokerRole());
        String topic = UUID.randomUUID().toString();
        String consumerGroup = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(topic, BaseConf.NAMESRV_ADDR, cluster, 1, CQType.SimpleCQ);
        DefaultMQProducer producer = ProducerFactory.getRMQProducer((String)BaseConf.NAMESRV_ADDR);
        DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer((String)BaseConf.NAMESRV_ADDR, (String)consumerGroup);
        for (int i = 0; i < 10; ++i) {
            Message message = new Message();
            message.setTopic(topic);
            message.setBody(("Hello" + i).getBytes());
            SendResult sendResult = producer.send(message);
            Assert.assertEquals((Object)SendStatus.SEND_OK, (Object)sendResult.getSendStatus());
            Assert.assertEquals((long)0L, (long)sendResult.getMessageQueue().getQueueId());
            Assert.assertEquals((Object)brokerName, (Object)sendResult.getMessageQueue().getBrokerName());
            Assert.assertEquals((long)i, (long)sendResult.getQueueOffset());
            Assert.assertNotNull((Object)sendResult.getMsgId());
            Assert.assertNotNull((Object)sendResult.getOffsetMsgId());
        }
        Thread.sleep(500L);
        Assert.assertEquals((long)0L, (long)brokerController.getMessageStore().getMinOffsetInQueue(topic, 0));
        Assert.assertEquals((long)10L, (long)brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0));
        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
        PullResult pullResult = consumer.pull(messageQueue, "*", 0L, 32);
        Assert.assertEquals((Object)PullStatus.FOUND, (Object)pullResult.getPullStatus());
        Assert.assertEquals((long)10L, (long)pullResult.getMsgFoundList().size());
        for (int i = 0; i < 10; ++i) {
            MessageExt messageExt = (MessageExt)pullResult.getMsgFoundList().get(i);
            Assert.assertEquals((long)i, (long)messageExt.getQueueOffset());
            Assert.assertArrayEquals((byte[])("Hello" + i).getBytes(), (byte[])messageExt.getBody());
        }
        producer.shutdown();
        consumer.shutdown();
        brokerController.shutdown();
    }
}

