package org.apache.rocketmq.test.client.consumer.pop;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.class */
public class PopBigMessageIT extends BasePopNormally {
    private static final int BODY_LEN = 3145728;

    private Message createBigMessage() {
        return new Message(this.topic, new byte[BODY_LEN]);
    }

    @Test
    public void testSendAndRecvBigMsgWhenDisablePopBufferMerge() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
        testSendAndRecvBigMsg();
    }

    @Test
    public void testSendAndRecvBigMsgWhenEnablePopBufferMerge() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
        testSendAndRecvBigMsg();
    }

    private void testSendAndRecvBigMsg() {
        this.producer.send(createBigMessage());
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = popMessageAsync(Duration.ofSeconds(3L).toMillis(), 1, 5000L).get();
            Assert.assertEquals(PopStatus.FOUND, popResult.getPopStatus());
            atomicReference.set(popResult.getMsgFoundList().get(0));
            assertMessageRecv((MessageExt) atomicReference.get());
        });
        Awaitility.await().atMost(Duration.ofSeconds(60L)).untilAsserted(() -> {
            PopResult popResult = popMessageAsync(Duration.ofSeconds(3L).toMillis(), 1, 5000L).get();
            Assert.assertEquals(PopStatus.FOUND, popResult.getPopStatus());
            assertMessageRecv((MessageExt) popResult.getMsgFoundList().get(0));
            Assert.assertEquals(((MessageExt) atomicReference.get()).getBody().length, r0.getBody().length);
        });
    }

    private void assertMessageRecv(MessageExt messageExt) throws IOException {
        Assert.assertEquals(1L, messageExt.getSysFlag() & 1);
        Assert.assertEquals(3145728L, CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(messageExt.getSysFlag())).decompress(messageExt.getBody()).length);
    }

    static {
        System.setProperty("com.rocketmq.decompress.body", "false");
    }
}
