/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.client.consumer.pop;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.consumer.pop.BasePop;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopClient;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BatchAckIT
extends BasePop {
    protected String topic;
    protected String group;
    protected RMQNormalProducer producer = null;
    protected RMQPopClient client = null;
    protected String brokerAddr;
    protected MessageQueue messageQueue;

    @Before
    public void setUp() {
        this.brokerAddr = brokerController1.getBrokerAddr();
        this.topic = MQRandomUtils.getRandomTopic();
        this.group = BatchAckIT.initConsumerGroup();
        IntegrationTestBase.initTopic(this.topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL);
        this.producer = BatchAckIT.getProducer(NAMESRV_ADDR, this.topic);
        this.client = this.getRMQPopClient();
        this.messageQueue = new MessageQueue(this.topic, BROKER1_NAME, -1);
    }

    @After
    public void tearDown() {
        BatchAckIT.shutdown();
    }

    @Test
    public void testBatchAckNormallyWithPopBuffer() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
        this.testBatchAck(() -> {
            try {
                return this.popMessageAsync().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
        this.testBatchAck(() -> {
            try {
                return this.popMessageAsync().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testBatchAckOrderly() throws Throwable {
        this.testBatchAck(() -> {
            try {
                return this.popMessageOrderlyAsync().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void testBatchAck(Supplier<PopResult> popResultSupplier) throws Throwable {
        this.producer.send(10L);
        AtomicInteger firstMsgRcvNum = new AtomicInteger();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = (PopResult)popResultSupplier.get();
            if (popResult.getPopStatus().equals((Object)PopStatus.FOUND)) {
                firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
            }
            Assert.assertEquals((long)10L, (long)firstMsgRcvNum.get());
        });
        TimeUnit.SECONDS.sleep(6L);
        this.producer.send(20L);
        ArrayList extraInfoList = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = (PopResult)popResultSupplier.get();
            if (popResult.getPopStatus().equals((Object)PopStatus.FOUND)) {
                for (MessageExt messageExt : popResult.getMsgFoundList()) {
                    extraInfoList.add(messageExt.getProperty("POP_CK"));
                }
            }
            Assert.assertEquals((long)30L, (long)extraInfoList.size());
        });
        AckResult ackResult = (AckResult)this.client.batchAckMessageAsync(this.brokerAddr, this.topic, this.group, extraInfoList).get();
        Assert.assertEquals((Object)AckStatus.OK, (Object)ackResult.getStatus());
        TimeUnit.SECONDS.sleep(6L);
        PopResult popResult = popResultSupplier.get();
        Assert.assertEquals((Object)PopStatus.POLLING_NOT_FOUND, (Object)popResult.getPopStatus());
    }

    private CompletableFuture<PopResult> popMessageAsync() {
        return this.client.popMessageAsync(this.brokerAddr, this.messageQueue, Duration.ofSeconds(3L).toMillis(), 30, this.group, 3000L, false, 0, false, "TAG", "*");
    }

    private CompletableFuture<PopResult> popMessageOrderlyAsync() {
        return this.client.popMessageAsync(this.brokerAddr, this.messageQueue, Duration.ofSeconds(3L).toMillis(), 30, this.group, 3000L, false, 0, true, "TAG", "*", null);
    }
}

