package org.apache.rocketmq.test.client.consumer.pop;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopClient;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.class */
public class BatchAckIT extends BasePop {
    protected String topic;
    protected String group;
    protected RMQNormalProducer producer = null;
    protected RMQPopClient client = null;
    protected String brokerAddr;
    protected MessageQueue messageQueue;

    @Before
    public void setUp() {
        this.brokerAddr = brokerController1.getBrokerAddr();
        this.topic = MQRandomUtils.getRandomTopic();
        this.group = initConsumerGroup();
        IntegrationTestBase.initTopic(this.topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL);
        this.producer = getProducer(NAMESRV_ADDR, this.topic);
        this.client = getRMQPopClient();
        this.messageQueue = new MessageQueue(this.topic, BROKER1_NAME, -1);
    }

    @After
    public void tearDown() {
        shutdown();
    }

    @Test
    public void testBatchAckNormallyWithPopBuffer() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
        testBatchAck(() -> {
            try {
                return popMessageAsync().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
        brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
        brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
        testBatchAck(() -> {
            try {
                return popMessageAsync().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testBatchAckOrderly() throws Throwable {
        testBatchAck(() -> {
            try {
                return popMessageOrderlyAsync().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void testBatchAck(Supplier<PopResult> supplier) throws Throwable {
        this.producer.send(10L);
        AtomicInteger atomicInteger = new AtomicInteger();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = (PopResult) supplier.get();
            if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
                atomicInteger.addAndGet(popResult.getMsgFoundList().size());
            }
            Assert.assertEquals(10L, atomicInteger.get());
        });
        TimeUnit.SECONDS.sleep(6L);
        this.producer.send(20L);
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            PopResult popResult = (PopResult) supplier.get();
            if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
                Iterator it = popResult.getMsgFoundList().iterator();
                while (it.hasNext()) {
                    arrayList.add(((MessageExt) it.next()).getProperty("POP_CK"));
                }
            }
            Assert.assertEquals(30L, arrayList.size());
        });
        Assert.assertEquals(AckStatus.OK, ((AckResult) this.client.batchAckMessageAsync(this.brokerAddr, this.topic, this.group, arrayList).get()).getStatus());
        TimeUnit.SECONDS.sleep(6L);
        Assert.assertEquals(PopStatus.POLLING_NOT_FOUND, supplier.get().getPopStatus());
    }

    private CompletableFuture<PopResult> popMessageAsync() {
        return this.client.popMessageAsync(this.brokerAddr, this.messageQueue, Duration.ofSeconds(3L).toMillis(), 30, this.group, 3000L, false, 0, false, "TAG", "*");
    }

    private CompletableFuture<PopResult> popMessageOrderlyAsync() {
        return this.client.popMessageAsync(this.brokerAddr, this.messageQueue, Duration.ofSeconds(3L).toMillis(), 30, this.group, 3000L, false, 0, true, "TAG", "*", (String) null);
    }
}
