package org.apache.rocketmq.test.base.dledger;

import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.class */
public class DLedgerProduceAndConsumeIT {
    public BrokerConfig buildBrokerConfig(String str, String str2) {
        BrokerConfig brokerConfig = new BrokerConfig();
        brokerConfig.setBrokerClusterName(str);
        brokerConfig.setBrokerName(str2);
        brokerConfig.setBrokerIP1("127.0.0.1");
        brokerConfig.setNamesrvAddr(BaseConf.nsAddr);
        return brokerConfig;
    }

    public MessageStoreConfig buildStoreConfig(String str, String str2, String str3) {
        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        String createBaseDir = IntegrationTestBase.createBaseDir();
        messageStoreConfig.setStorePathRootDir(createBaseDir);
        messageStoreConfig.setStorePathCommitLog(createBaseDir + "_commitlog");
        messageStoreConfig.setHaListenPort(IntegrationTestBase.nextPort());
        messageStoreConfig.setMappedFileSizeCommitLog(10485760);
        messageStoreConfig.setEnableDLegerCommitLog(true);
        messageStoreConfig.setdLegerGroup(str);
        messageStoreConfig.setdLegerSelfId(str3);
        messageStoreConfig.setdLegerPeers(str2);
        return messageStoreConfig;
    }

    @Test
    public void testProduceAndConsume() throws Exception {
        String uuid = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        String format = String.format("n0-localhost:%d", Integer.valueOf(IntegrationTestBase.nextPort()));
        BrokerConfig buildBrokerConfig = buildBrokerConfig(uuid, uuid2);
        MessageStoreConfig buildStoreConfig = buildStoreConfig(uuid2, format, "n0");
        BrokerController createAndStartBroker = IntegrationTestBase.createAndStartBroker(buildStoreConfig, buildBrokerConfig);
        Thread.sleep(3000L);
        Assert.assertEquals(BrokerRole.SYNC_MASTER, buildStoreConfig.getBrokerRole());
        String uuid3 = UUID.randomUUID().toString();
        String uuid4 = UUID.randomUUID().toString();
        IntegrationTestBase.initTopic(uuid3, BaseConf.nsAddr, uuid, 1);
        DefaultMQProducer rMQProducer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
        DefaultMQPullConsumer rMQPullConsumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, uuid4);
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic(uuid3);
            message.setBody(("Hello" + i).getBytes());
            SendResult send = rMQProducer.send(message);
            Assert.assertEquals(SendStatus.SEND_OK, send.getSendStatus());
            Assert.assertEquals(0L, send.getMessageQueue().getQueueId());
            Assert.assertEquals(uuid2, send.getMessageQueue().getBrokerName());
            Assert.assertEquals(i, send.getQueueOffset());
            Assert.assertNotNull(send.getMsgId());
            Assert.assertNotNull(send.getOffsetMsgId());
        }
        Thread.sleep(500L);
        Assert.assertEquals(0L, createAndStartBroker.getMessageStore().getMinOffsetInQueue(uuid3, 0));
        Assert.assertEquals(10L, createAndStartBroker.getMessageStore().getMaxOffsetInQueue(uuid3, 0));
        PullResult pull = rMQPullConsumer.pull(new MessageQueue(uuid3, uuid2, 0), "*", 0L, 32);
        Assert.assertEquals(PullStatus.FOUND, pull.getPullStatus());
        Assert.assertEquals(10L, pull.getMsgFoundList().size());
        for (int i2 = 0; i2 < 10; i2++) {
            MessageExt messageExt = (MessageExt) pull.getMsgFoundList().get(i2);
            Assert.assertEquals(i2, messageExt.getQueueOffset());
            Assert.assertArrayEquals(("Hello" + i2).getBytes(), messageExt.getBody());
        }
        rMQProducer.shutdown();
        rMQPullConsumer.shutdown();
        createAndStartBroker.shutdown();
    }
}
