/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.client.consumer.pop;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.consumer.pop.BasePop;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopClient;
import org.apache.rocketmq.test.message.MessageQueueMsg;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;

@Ignore
public class BasePopOrderly
extends BasePop {
    protected String topic;
    protected String group;
    protected RMQNormalProducer producer = null;
    protected RMQPopClient client = null;
    protected String brokerAddr;
    protected MessageQueue messageQueue;
    protected final Map<String, List<BasePop.MsgRcv>> msgRecv = new ConcurrentHashMap<String, List<BasePop.MsgRcv>>();
    protected final List<String> msgRecvSequence = new CopyOnWriteArrayList<String>();
    protected final List<Object> msgDataRecv = new CopyOnWriteArrayList<Object>();

    @Before
    public void setUp() {
        brokerController1.getBrokerConfig().setEnableNotifyAfterPopOrderLockRelease(true);
        this.brokerAddr = brokerController1.getBrokerAddr();
        this.topic = MQRandomUtils.getRandomTopic();
        this.group = BasePopOrderly.initConsumerGroup();
        IntegrationTestBase.initTopic(this.topic, NAMESRV_ADDR, BROKER1_NAME, 1, CQType.SimpleCQ, TopicMessageType.FIFO);
        this.producer = BasePopOrderly.getProducer(NAMESRV_ADDR, this.topic);
        this.client = this.getRMQPopClient();
        this.messageQueue = new MessageQueue(this.topic, BROKER1_NAME, -1);
    }

    @After
    public void tearDown() {
        BasePopOrderly.shutdown();
    }

    protected void sendMessage(int num) {
        MessageQueueMsg mqMsgs = new MessageQueueMsg((List)Lists.newArrayList((Object[])new MessageQueue[]{this.messageQueue}), num);
        this.producer.send(mqMsgs.getMsgsWithMQ());
    }

    protected void assertMessageRecvOrder() {
        VerifyUtils.verifyOrderMsg(this.msgDataRecv);
    }

    protected void assertMsgRecv(int seqId, int expectNum) {
        String msgId = this.msgRecvSequence.get(seqId);
        List<BasePop.MsgRcv> msgRcvList = this.msgRecv.get(msgId);
        Assert.assertEquals((long)expectNum, (long)msgRcvList.size());
        this.assertConsumeTimes(msgRcvList);
    }

    protected void assertConsumeTimes(List<BasePop.MsgRcv> msgRcvList) {
        for (int i = 0; i < msgRcvList.size(); ++i) {
            Assert.assertEquals((long)i, (long)msgRcvList.get((int)i).messageExt.getReconsumeTimes());
        }
    }

    protected void assertMsgRecv(int seqId, int expectNum, List<Integer> expectReconsumeTimes) {
        String msgId = this.msgRecvSequence.get(seqId);
        List<BasePop.MsgRcv> msgRcvList = this.msgRecv.get(msgId);
        Assert.assertEquals((long)expectNum, (long)msgRcvList.size());
        this.assertConsumeTimes(msgRcvList, expectReconsumeTimes);
    }

    protected void assertConsumeTimes(List<BasePop.MsgRcv> msgRcvList, List<Integer> expectReconsumeTimes) {
        for (int i = 0; i < msgRcvList.size(); ++i) {
            Assert.assertEquals((long)expectReconsumeTimes.get(i).intValue(), (long)msgRcvList.get((int)i).messageExt.getReconsumeTimes());
        }
    }

    protected void onRecvNewMessage(MessageExt messageExt) {
        this.msgDataRecv.add(new String(messageExt.getBody()));
        this.msgRecvSequence.add(messageExt.getMsgId());
        this.msgRecv.compute(messageExt.getMsgId(), (k, msgRcvList) -> {
            if (msgRcvList == null) {
                msgRcvList = new CopyOnWriteArrayList<BasePop.MsgRcv>();
            }
            msgRcvList.add(new BasePop.MsgRcv(System.currentTimeMillis(), messageExt));
            return msgRcvList;
        });
    }

    protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout) {
        return this.popMessageOrderlyAsync(invisibleTime, maxNums, timeout, null);
    }

    protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout, String attemptId) {
        return this.client.popMessageAsync(this.brokerAddr, this.messageQueue, invisibleTime, maxNums, this.group, timeout, true, 0, true, "TAG", "*", attemptId);
    }

    protected CompletableFuture<AckResult> ackMessageAsync(MessageExt messageExt) {
        return this.client.ackMessageAsync(this.brokerAddr, this.topic, this.group, messageExt.getProperty("POP_CK"));
    }

    protected CompletableFuture<AckResult> changeInvisibleTimeAsync(MessageExt messageExt, long invisibleTime) {
        return this.client.changeInvisibleTimeAsync(this.brokerAddr, BROKER1_NAME, this.topic, this.group, messageExt.getProperty("POP_CK"), invisibleTime);
    }
}

