package org.apache.rocketmq.test.container;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.container.BrokerContainer;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/container/SyncConsumerOffsetIT.class */
public class SyncConsumerOffsetIT extends ContainerIntegrationTestBase {
    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
    private static DefaultMQProducer mqProducer;
    private static DefaultMQPushConsumer mqConsumerThreeReplica;
    private static final String TEST_SYNC_TOPIC = SyncConsumerOffsetIT.class.getSimpleName() + "_topic";
    private static final String MSG = "Hello RocketMQ ";
    private static final byte[] MESSAGE_BODY = MSG.getBytes(StandardCharsets.UTF_8);

    @BeforeClass
    public static void beforeClass() throws Exception {
        createTopicTo(master3With3Replicas, TEST_SYNC_TOPIC);
        mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
        mqProducer.setSendMsgTimeout(15000);
        mqProducer.start();
        mqConsumerThreeReplica = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica.subscribe(TEST_SYNC_TOPIC, "*");
    }

    @AfterClass
    public static void afterClass() {
        mqProducer.shutdown();
        mqConsumerThreeReplica.shutdown();
    }

    @Test
    public void syncConsumerOffsetWith3Replicas() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        syncConsumeOffsetInner(TEST_SYNC_TOPIC, mqConsumerThreeReplica, master3With3Replicas, Arrays.asList(brokerContainer1, brokerContainer2));
    }

    private void syncConsumeOffsetInner(String str, DefaultMQPushConsumer defaultMQPushConsumer, BrokerController brokerController, List<BrokerContainer> list) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        awaitUntilSlaveOK();
        String str2 = THREE_REPLICA_CONSUMER_GROUP;
        for (int i = 0; i < 100; i++) {
            Java6Assertions.assertThat(mqProducer.send(new Message(str, MESSAGE_BODY)).getSendStatus()).isEqualTo(SendStatus.SEND_OK);
        }
        CountDownLatch countDownLatch = new CountDownLatch(100);
        defaultMQPushConsumer.registerMessageListener((list2, consumeConcurrentlyContext) -> {
            countDownLatch.countDown();
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        defaultMQPushConsumer.start();
        Java6Assertions.assertThat(countDownLatch.await(100L, TimeUnit.SECONDS)).isEqualTo(true);
        System.out.printf("consume complete%n", new Object[0]);
        Set<MessageQueue> filterMessageQueue = filterMessageQueue(defaultMQPushConsumer.fetchSubscribeMessageQueues(str), str);
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).until(() -> {
            HashMap hashMap = new HashMap();
            long j = 0;
            Iterator it = filterMessageQueue.iterator();
            while (it.hasNext()) {
                MessageQueue messageQueue = (MessageQueue) it.next();
                long queryOffset = brokerController.getConsumerOffsetManager().queryOffset(str2, str, messageQueue.getQueueId());
                if (queryOffset >= 0) {
                    j += queryOffset;
                    hashMap.put(Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(queryOffset));
                }
            }
            if (j < 100) {
                return false;
            }
            boolean z = true;
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                for (InnerSalveBrokerController innerSalveBrokerController : ((BrokerContainer) it2.next()).getSlaveBrokers()) {
                    if (innerSalveBrokerController.getBrokerConfig().getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName())) {
                        Iterator it3 = filterMessageQueue.iterator();
                        while (it3.hasNext()) {
                            MessageQueue messageQueue2 = (MessageQueue) it3.next();
                            z &= innerSalveBrokerController.getConsumerOffsetManager().queryOffset(str2, str, messageQueue2.getQueueId()) == ((Long) hashMap.get(Integer.valueOf(messageQueue2.getQueueId()))).longValue();
                        }
                    }
                }
            }
            return Boolean.valueOf(z);
        });
    }
}
