/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.client.consumer.pop;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.test.client.consumer.pop.BasePopNormally;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;

public class PopBigMessageIT
extends BasePopNormally {
    private static final int BODY_LEN = 0x300000;

    private Message createBigMessage() {
        byte[] bytes = new byte[0x300000];
        return new Message(this.topic, bytes);
    }

    @Test
    public void testSendAndRecvBigMsgWhenDisablePopBufferMerge() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
        this.testSendAndRecvBigMsg();
    }

    @Test
    public void testSendAndRecvBigMsgWhenEnablePopBufferMerge() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
        this.testSendAndRecvBigMsg();
    }

    private void testSendAndRecvBigMsg() {
        Message message = this.createBigMessage();
        this.producer.send((Object)message);
        AtomicReference firstMessageExtRef = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = this.popMessageAsync(Duration.ofSeconds(3L).toMillis(), 1, 5000L).get();
            Assert.assertEquals((Object)PopStatus.FOUND, (Object)popResult.getPopStatus());
            firstMessageExtRef.set(popResult.getMsgFoundList().get(0));
            MessageExt messageExt = (MessageExt)firstMessageExtRef.get();
            this.assertMessageRecv(messageExt);
        });
        Awaitility.await().atMost(Duration.ofSeconds(60L)).untilAsserted(() -> {
            PopResult retryPopResult = this.popMessageAsync(Duration.ofSeconds(3L).toMillis(), 1, 5000L).get();
            Assert.assertEquals((Object)PopStatus.FOUND, (Object)retryPopResult.getPopStatus());
            MessageExt retryMessageExt = (MessageExt)retryPopResult.getMsgFoundList().get(0);
            this.assertMessageRecv(retryMessageExt);
            Assert.assertEquals((long)((MessageExt)firstMessageExtRef.get()).getBody().length, (long)retryMessageExt.getBody().length);
        });
    }

    private void assertMessageRecv(MessageExt messageExt) throws IOException {
        Assert.assertEquals((long)1L, (long)(messageExt.getSysFlag() & 1));
        Compressor compressor = CompressorFactory.getCompressor((CompressionType)MessageSysFlag.getCompressionType((int)messageExt.getSysFlag()));
        Assert.assertEquals((long)0x300000L, (long)compressor.decompress(messageExt.getBody()).length);
    }

    static {
        System.setProperty("com.rocketmq.decompress.body", "false");
    }
}

