/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.container.BrokerContainer;
import org.apache.rocketmq.container.InnerBrokerController;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class PopSlaveActingMasterIT
extends ContainerIntegrationTestBase {
    private static final String CONSUME_GROUP = PopSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
    private static final int MESSAGE_COUNT = 16;
    private final Random random = new Random();
    private static DefaultMQProducer producer;
    private static final String MESSAGE_STRING;
    private static final byte[] MESSAGE_BODY;
    private final BrokerConfig brokerConfig = new BrokerConfig();

    void createTopic(String topic) {
        PopSlaveActingMasterIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        PopSlaveActingMasterIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        PopSlaveActingMasterIT.createTopicTo(master3With3Replicas, topic, 1, 1);
    }

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = PopSlaveActingMasterIT.createProducer(PopSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Test
    public void testLocalActing_ackSlave() throws Exception {
        String topic = PopSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        String retryTopic = KeyBuilder.buildPopRetryTopic((String)topic, (String)CONSUME_GROUP, (boolean)this.brokerConfig.isEnableRetryTopicV2());
        this.createTopic(retryTopic);
        this.switchPop(topic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 16);
        PopSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        DefaultMQPushConsumer consumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        consumer.subscribe(topic, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList consumedMessages = new CopyOnWriteArrayList();
        consumer.registerMessageListener((msgs, context) -> {
            msgs.forEach(msg -> consumedMessages.add(msg.getMsgId()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.setClientRebalance(false);
        consumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> consumedMessages.size() >= 16);
        consumer.shutdown();
        CopyOnWriteArrayList retryMsgList = new CopyOnWriteArrayList();
        DefaultMQPushConsumer pushConsumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(retryTopic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                retryMsgList.add(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        Thread.sleep(10000L);
        Assertions.assertThat((int)retryMsgList.size()).isEqualTo(0);
        PopSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        PopSlaveActingMasterIT.awaitUntilSlaveOK();
        pushConsumer.shutdown();
    }

    @Test
    public void testLocalActing_notAckSlave() throws Exception {
        String topic = PopSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        String retryTopic = KeyBuilder.buildPopRetryTopic((String)topic, (String)CONSUME_GROUP, (boolean)this.brokerConfig.isEnableRetryTopicV2());
        this.createTopic(retryTopic);
        this.switchPop(topic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        HashSet<String> sendToIsolateMsgSet = new HashSet<String>();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            sendToIsolateMsgSet.add(new String(msg.getBody()));
            ++sendSuccess;
        }
        System.out.printf("send success %d%n", sendSuccess);
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 16);
        PopSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        System.out.printf("isolate master1%n", new Object[0]);
        DefaultMQPushConsumer consumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        consumer.subscribe(topic, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setPopInvisibleTime(5000L);
        CopyOnWriteArrayList consumedMessages = new CopyOnWriteArrayList();
        consumer.registerMessageListener((msgs, context) -> {
            msgs.forEach(msg -> {
                msg.setReconsumeTimes(0);
                consumedMessages.add(msg.getMsgId());
            });
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        consumer.setClientRebalance(false);
        consumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> consumedMessages.size() >= 16);
        consumer.shutdown();
        CopyOnWriteArrayList retryMsgList = new CopyOnWriteArrayList();
        DefaultMQPushConsumer pushConsumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(retryTopic, "*");
        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                retryMsgList.add(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        AtomicInteger failCnt = new AtomicInteger(0);
        Awaitility.await().atMost(Duration.ofMinutes(3L)).pollInterval(Duration.ofSeconds(10L)).until(() -> {
            if (retryMsgList.size() < 16) {
                return false;
            }
            for (String msgBodyString : retryMsgList) {
                if (sendToIsolateMsgSet.contains(msgBodyString)) continue;
                return false;
            }
            return true;
        });
        PopSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        PopSlaveActingMasterIT.awaitUntilSlaveOK();
        pushConsumer.shutdown();
    }

    @Test
    public void testRemoteActing_ackSlave() throws Exception {
        String topic = PopSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        String retryTopic = KeyBuilder.buildPopRetryTopic((String)topic, (String)CONSUME_GROUP, (boolean)this.brokerConfig.isEnableRetryTopicV2());
        this.createTopic(retryTopic);
        this.switchPop(topic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 16);
        PopSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        PopSlaveActingMasterIT.isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        DefaultMQPushConsumer consumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        consumer.subscribe(topic, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList consumedMessages = new CopyOnWriteArrayList();
        consumer.registerMessageListener((msgs, context) -> {
            msgs.forEach(msg -> consumedMessages.add(msg.getMsgId()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.setClientRebalance(false);
        consumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> consumedMessages.size() >= 16);
        consumer.shutdown();
        CopyOnWriteArrayList retryMsgList = new CopyOnWriteArrayList();
        DefaultMQPushConsumer pushConsumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(retryTopic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                retryMsgList.add(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        Thread.sleep(10000L);
        Assertions.assertThat((int)retryMsgList.size()).isEqualTo(0);
        PopSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        PopSlaveActingMasterIT.cancelIsolatedBroker(master2With3Replicas);
        PopSlaveActingMasterIT.awaitUntilSlaveOK();
        Thread.sleep(10000L);
        Assertions.assertThat((int)retryMsgList.size()).isEqualTo(0);
        pushConsumer.shutdown();
    }

    @Test
    public void testRemoteActing_notAckSlave_getFromLocal() throws Exception {
        String topic = PopSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        this.switchPop(topic);
        String retryTopic = KeyBuilder.buildPopRetryTopic((String)topic, (String)CONSUME_GROUP, (boolean)this.brokerConfig.isEnableRetryTopicV2());
        this.createTopic(retryTopic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        HashSet<String> sendToIsolateMsgSet = new HashSet<String>();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            sendToIsolateMsgSet.add(new String(msg.getBody()));
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 16);
        PopSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        PopSlaveActingMasterIT.isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        DefaultMQPushConsumer consumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        consumer.subscribe(topic, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList consumedMessages = new CopyOnWriteArrayList();
        consumer.registerMessageListener((msgs, context) -> {
            msgs.forEach(msg -> consumedMessages.add(msg.getMsgId()));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        consumer.setClientRebalance(false);
        consumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> consumedMessages.size() >= 16);
        consumer.shutdown();
        CopyOnWriteArrayList retryMsgList = new CopyOnWriteArrayList();
        DefaultMQPushConsumer pushConsumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(retryTopic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                retryMsgList.add(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            if (retryMsgList.size() < 16) {
                return false;
            }
            for (String msgBodyString : retryMsgList) {
                if (sendToIsolateMsgSet.contains(msgBodyString)) continue;
                return false;
            }
            return true;
        });
        PopSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        PopSlaveActingMasterIT.cancelIsolatedBroker(master2With3Replicas);
        PopSlaveActingMasterIT.awaitUntilSlaveOK();
        pushConsumer.shutdown();
    }

    @Test
    public void testRemoteActing_notAckSlave_getFromRemote() throws Exception {
        String topic = PopSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        this.switchPop(topic);
        String retryTopic = KeyBuilder.buildPopRetryTopic((String)topic, (String)CONSUME_GROUP, (boolean)this.brokerConfig.isEnableRetryTopicV2());
        this.createTopic(retryTopic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        HashSet<String> sendToIsolateMsgSet = new HashSet<String>();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            sendToIsolateMsgSet.add(new String(msg.getBody()));
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 16);
        PopSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        PopSlaveActingMasterIT.isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        InnerSalveBrokerController slave1InBrokerContainer3 = PopSlaveActingMasterIT.getSlaveFromContainerByName(brokerContainer3, master1With3Replicas.getBrokerConfig().getBrokerName());
        PopSlaveActingMasterIT.isolateBroker((BrokerController)slave1InBrokerContainer3);
        brokerContainer3.removeBroker(new BrokerIdentity(slave1InBrokerContainer3.getBrokerConfig().getBrokerClusterName(), slave1InBrokerContainer3.getBrokerConfig().getBrokerName(), slave1InBrokerContainer3.getBrokerConfig().getBrokerId()));
        DefaultMQPushConsumer consumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        consumer.subscribe(topic, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList consumedMessages = new CopyOnWriteArrayList();
        consumer.registerMessageListener((msgs, context) -> {
            msgs.forEach(msg -> consumedMessages.add(msg.getMsgId()));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        consumer.setClientRebalance(false);
        consumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> consumedMessages.size() >= 16);
        consumer.shutdown();
        CopyOnWriteArrayList retryMsgList = new CopyOnWriteArrayList();
        DefaultMQPushConsumer pushConsumer = PopSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(retryTopic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                retryMsgList.add(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        Thread.sleep(10000L);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            if (retryMsgList.size() < 16) {
                return false;
            }
            for (String msgBodyString : retryMsgList) {
                if (sendToIsolateMsgSet.contains(msgBodyString)) continue;
                return false;
            }
            return true;
        });
        PopSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        PopSlaveActingMasterIT.cancelIsolatedBroker(master2With3Replicas);
        slave1InBrokerContainer3 = brokerContainer3.addBroker(slave1InBrokerContainer3.getBrokerConfig(), slave1InBrokerContainer3.getMessageStoreConfig());
        slave1InBrokerContainer3.start();
        PopSlaveActingMasterIT.cancelIsolatedBroker((BrokerController)slave1InBrokerContainer3);
        PopSlaveActingMasterIT.awaitUntilSlaveOK();
        pushConsumer.shutdown();
    }

    private void switchPop(String topic) throws Exception {
        for (BrokerContainer brokerContainer : brokerContainerList) {
            for (InnerBrokerController master : brokerContainer.getMasterBrokers()) {
                String brokerAddr = master.getBrokerAddr();
                defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60000L);
            }
            for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
                defaultMQAdminExt.setMessageRequestMode(slave.getBrokerAddr(), topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60000L);
            }
        }
    }

    static {
        MESSAGE_STRING = RandomStringUtils.random((int)1024);
        MESSAGE_BODY = MESSAGE_STRING.getBytes(StandardCharsets.UTF_8);
    }
}

