/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class RebalanceLockOnSlaveIT
extends ContainerIntegrationTestBase {
    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
    private static DefaultMQProducer mqProducer;
    private static DefaultMQPushConsumer mqConsumerThreeReplica1;
    private static DefaultMQPushConsumer mqConsumerThreeReplica2;
    private static DefaultMQPushConsumer mqConsumerThreeReplica3;

    @BeforeClass
    public static void beforeClass() throws Exception {
        mqProducer = RebalanceLockOnSlaveIT.createProducer("SyncConsumerOffsetIT_Producer");
        mqProducer.start();
        mqConsumerThreeReplica1 = RebalanceLockOnSlaveIT.createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica1.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
        mqConsumerThreeReplica2 = RebalanceLockOnSlaveIT.createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica2.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
        mqConsumerThreeReplica3 = RebalanceLockOnSlaveIT.createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
        mqConsumerThreeReplica3.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        mqConsumerThreeReplica3.subscribe("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", "*");
    }

    @AfterClass
    public static void afterClass() {
        if (mqProducer != null) {
            mqProducer.shutdown();
        }
    }

    @Test
    public void lockFromSlave() throws Exception {
        RebalanceLockOnSlaveIT.awaitUntilSlaveOK();
        mqConsumerThreeReplica3.registerMessageListener((msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
        mqConsumerThreeReplica3.start();
        Set mqSet = mqConsumerThreeReplica3.fetchSubscribeMessageQueues("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat((int)RebalanceLockOnSlaveIT.targetTopicMqCount(mqSet, "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")).isEqualTo(24);
        for (MessageQueue mq : mqSet) {
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
        }
        RebalanceLockOnSlaveIT.isolateBroker(master3With3Replicas);
        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        FindBrokerResult result = mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true);
        Assertions.assertThat((Object)result).isNotNull();
        for (MessageQueue mq : mqSet) {
            if (!mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) continue;
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
        }
        RebalanceLockOnSlaveIT.removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
        Assertions.assertThat((int)brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        for (MessageQueue mq : mqSet) {
            if (!mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) continue;
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> !mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
        }
        RebalanceLockOnSlaveIT.cancelIsolatedBroker(master3With3Replicas);
        RebalanceLockOnSlaveIT.createAndAddSlave(1, brokerContainer1, master3With3Replicas);
        RebalanceLockOnSlaveIT.awaitUntilSlaveOK();
        mqConsumerThreeReplica3.shutdown();
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
    }

    @Ignore
    @Test
    public void multiConsumerLockFromSlave() throws MQClientException, InterruptedException {
        RebalanceLockOnSlaveIT.awaitUntilSlaveOK();
        mqConsumerThreeReplica1.registerMessageListener((msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
        mqConsumerThreeReplica1.start();
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
        Set<MessageQueue> mqSet1 = RebalanceLockOnSlaveIT.filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat((int)mqSet1.size()).isEqualTo(24);
        RebalanceLockOnSlaveIT.isolateBroker(master3With3Replicas);
        System.out.printf("%s isolated%n", master3With3Replicas.getBrokerConfig().getCanonicalName());
        Thread.sleep(5000L);
        mqConsumerThreeReplica2.registerMessageListener((msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
        mqConsumerThreeReplica2.start();
        Thread.sleep(5000L);
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        Assertions.assertThat((Object)mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true)).isNotNull();
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true);
        Assertions.assertThat((Object)mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(master3With3Replicas.getBrokerConfig().getBrokerName(), 0L, true)).isNotNull();
        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().doRebalance();
        Set<MessageQueue> mqSet2 = RebalanceLockOnSlaveIT.filterMessageQueue(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        mqSet1 = RebalanceLockOnSlaveIT.filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        ArrayList<MessageQueue> mqList = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqSet2) {
            if (!mq.getTopic().equals("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")) continue;
            mqList.add(mq);
        }
        for (MessageQueue mq : mqSet1) {
            if (!mq.getTopic().equals("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS")) continue;
            mqList.add(mq);
        }
        Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> mqList.size() == 24);
        RebalanceLockOnSlaveIT.cancelIsolatedBroker(master3With3Replicas);
        RebalanceLockOnSlaveIT.awaitUntilSlaveOK();
        mqConsumerThreeReplica1.shutdown();
        mqConsumerThreeReplica2.shutdown();
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY && mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
    }

    private static int targetTopicMqCount(Set<MessageQueue> mqSet, String topic) {
        int count = 0;
        for (MessageQueue mq : mqSet) {
            if (!mq.getTopic().equals(topic)) continue;
            ++count;
        }
        return count;
    }
}

