/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.container.BrokerContainer;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class SyncConsumerOffsetIT
extends ContainerIntegrationTestBase {
    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
    private static final String TEST_SYNC_TOPIC = SyncConsumerOffsetIT.class.getSimpleName() + "_topic";
    private static DefaultMQProducer mqProducer;
    private static DefaultMQPushConsumer mqConsumerThreeReplica;
    private static final String MSG = "Hello RocketMQ ";
    private static final byte[] MESSAGE_BODY;

    @BeforeClass
    public static void beforeClass() throws Exception {
        SyncConsumerOffsetIT.createTopicTo(master3With3Replicas, TEST_SYNC_TOPIC);
        mqProducer = SyncConsumerOffsetIT.createProducer("SyncConsumerOffsetIT_Producer");
        mqProducer.setSendMsgTimeout(15000);
        mqProducer.start();
        mqConsumerThreeReplica = SyncConsumerOffsetIT.createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica.subscribe(TEST_SYNC_TOPIC, "*");
    }

    @AfterClass
    public static void afterClass() {
        mqProducer.shutdown();
        mqConsumerThreeReplica.shutdown();
    }

    @Test
    public void syncConsumerOffsetWith3Replicas() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        this.syncConsumeOffsetInner(TEST_SYNC_TOPIC, mqConsumerThreeReplica, master3With3Replicas, Arrays.asList(brokerContainer1, brokerContainer2));
    }

    private void syncConsumeOffsetInner(String topic, DefaultMQPushConsumer consumer, BrokerController master, List<BrokerContainer> slaveContainers) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        SyncConsumerOffsetIT.awaitUntilSlaveOK();
        String group = THREE_REPLICA_CONSUMER_GROUP;
        int msgCount = 100;
        for (int i = 0; i < msgCount; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = mqProducer.send(msg);
            Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
        }
        CountDownLatch countDownLatch = new CountDownLatch(msgCount);
        consumer.registerMessageListener((msgs, context) -> {
            countDownLatch.countDown();
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        boolean ok = countDownLatch.await(100L, TimeUnit.SECONDS);
        Java6Assertions.assertThat((boolean)ok).isEqualTo(true);
        System.out.printf("consume complete%n", new Object[0]);
        Set<MessageQueue> mqSet = SyncConsumerOffsetIT.filterMessageQueue(consumer.fetchSubscribeMessageQueues(topic), topic);
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).until(() -> {
            HashMap<Integer, Long> consumerOffsetMap = new HashMap<Integer, Long>();
            long offsetTotal = 0L;
            for (MessageQueue mq : mqSet) {
                long queueOffset = master.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
                if (queueOffset < 0L) continue;
                offsetTotal += queueOffset;
                consumerOffsetMap.put(mq.getQueueId(), queueOffset);
            }
            if (offsetTotal < 100L) {
                return false;
            }
            boolean syncOk = true;
            for (BrokerContainer brokerContainer : slaveContainers) {
                for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
                    if (!slave.getBrokerConfig().getBrokerName().equals(master.getBrokerConfig().getBrokerName())) continue;
                    for (MessageQueue mq : mqSet) {
                        long slaveOffset = slave.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
                        boolean check = slaveOffset == (Long)consumerOffsetMap.get(mq.getQueueId());
                        syncOk &= check;
                    }
                }
            }
            return syncOk;
        });
    }

    static {
        MESSAGE_BODY = MSG.getBytes(StandardCharsets.UTF_8);
    }
}

