package org.apache.rocketmq.test.container;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.class */
public class RebalanceLockOnSlaveIT extends ContainerIntegrationTestBase {
    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
    private static DefaultMQProducer mqProducer;
    private static DefaultMQPushConsumer mqConsumerThreeReplica1;
    private static DefaultMQPushConsumer mqConsumerThreeReplica2;
    private static DefaultMQPushConsumer mqConsumerThreeReplica3;

    @BeforeClass
    public static void beforeClass() throws Exception {
        mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
        mqProducer.start();
        mqConsumerThreeReplica1 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica1.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
        mqConsumerThreeReplica2 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica2.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
        mqConsumerThreeReplica3 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica3.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica3.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
    }

    @AfterClass
    public static void afterClass() {
        if (mqProducer != null) {
            mqProducer.shutdown();
        }
    }

    @Test
    public void lockFromSlave() throws Exception {
        awaitUntilSlaveOK();
        mqConsumerThreeReplica3.registerMessageListener((list, consumeOrderlyContext) -> {
            return ConsumeOrderlyStatus.SUCCESS;
        });
        mqConsumerThreeReplica3.start();
        Set<MessageQueue> fetchSubscribeMessageQueues = mqConsumerThreeReplica3.fetchSubscribeMessageQueues("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat(targetTopicMqCount(fetchSubscribeMessageQueues, "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")).isEqualTo(24);
        for (MessageQueue messageQueue : fetchSubscribeMessageQueues) {
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                return Boolean.valueOf(mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(messageQueue));
            });
        }
        isolateBroker(master3With3Replicas);
        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat(mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true)).isNotNull();
        for (MessageQueue messageQueue2 : fetchSubscribeMessageQueues) {
            if (messageQueue2.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                    return Boolean.valueOf(mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(messageQueue2));
                });
            }
        }
        removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
        Assertions.assertThat(brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        for (MessageQueue messageQueue3 : fetchSubscribeMessageQueues) {
            if (messageQueue3.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                    return Boolean.valueOf(!mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(messageQueue3));
                });
            }
        }
        cancelIsolatedBroker(master3With3Replicas);
        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
        awaitUntilSlaveOK();
        mqConsumerThreeReplica3.shutdown();
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
        });
    }

    @Test
    @Ignore
    public void multiConsumerLockFromSlave() throws MQClientException, InterruptedException {
        awaitUntilSlaveOK();
        mqConsumerThreeReplica1.registerMessageListener((list, consumeOrderlyContext) -> {
            return ConsumeOrderlyStatus.SUCCESS;
        });
        mqConsumerThreeReplica1.start();
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
        Assertions.assertThat(filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS").size()).isEqualTo(24);
        isolateBroker(master3With3Replicas);
        System.out.printf("%s isolated%n", master3With3Replicas.getBrokerConfig().getCanonicalName());
        Thread.sleep(5000L);
        mqConsumerThreeReplica2.registerMessageListener((list2, consumeOrderlyContext2) -> {
            return ConsumeOrderlyStatus.SUCCESS;
        });
        mqConsumerThreeReplica2.start();
        Thread.sleep(5000L);
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true)).isNotNull();
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true);
        Assertions.assertThat(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true)).isNotNull();
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().doRebalance();
        Set<MessageQueue> filterMessageQueue = filterMessageQueue(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Set<MessageQueue> filterMessageQueue2 = filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        ArrayList arrayList = new ArrayList();
        for (MessageQueue messageQueue : filterMessageQueue) {
            if (messageQueue.getTopic().equals("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")) {
                arrayList.add(messageQueue);
            }
        }
        for (MessageQueue messageQueue2 : filterMessageQueue2) {
            if (messageQueue2.getTopic().equals("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")) {
                arrayList.add(messageQueue2);
            }
        }
        Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> {
            return Boolean.valueOf(arrayList.size() == 24);
        });
        cancelIsolatedBroker(master3With3Replicas);
        awaitUntilSlaveOK();
        mqConsumerThreeReplica1.shutdown();
        mqConsumerThreeReplica2.shutdown();
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY && mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
        });
    }

    private static int targetTopicMqCount(Set<MessageQueue> set, String str) {
        int i = 0;
        Iterator<MessageQueue> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().getTopic().equals(str)) {
                i++;
            }
        }
        return i;
    }
}
