/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class ScheduleSlaveActingMasterIT
extends ContainerIntegrationTestBase {
    private static final String CONSUME_GROUP = ScheduleSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
    private static final int MESSAGE_COUNT = 32;
    private final Random random = new Random();
    private static DefaultMQProducer producer;
    private static final String MESSAGE_STRING;
    private static final byte[] MESSAGE_BODY;

    void createTopic(String topic) {
        ScheduleSlaveActingMasterIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        ScheduleSlaveActingMasterIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        ScheduleSlaveActingMasterIT.createTopicTo(master3With3Replicas, topic, 1, 1);
    }

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = ScheduleSlaveActingMasterIT.createProducer(ScheduleSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Test
    public void testLocalActing_delayMsg() throws Exception {
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        DefaultMQPushConsumer pushConsumer = ScheduleSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            long period = System.currentTimeMillis() - ((MessageExt)msgs.get(0)).getBornTimestamp();
            if (Math.abs(period - 30000L) <= 4000L) {
                inTimeMsgCount.addAndGet(msgs.size());
            }
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            msg.setDelayTimeLevel(4);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        System.out.printf("send success%n", new Object[0]);
        ScheduleSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master1%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> receivedMsgCount.get() >= 32 && (double)inTimeMsgCount.get() >= 30.4);
        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
        pushConsumer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Add back master1%n", new Object[0]);
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        Thread.sleep(30000L);
    }

    @Test
    public void testLocalActing_timerMsg() throws Exception {
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        DefaultMQPushConsumer pushConsumer = ScheduleSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            long period = System.currentTimeMillis() - ((MessageExt)msgs.get(0)).getBornTimestamp();
            if (Math.abs(period - 30000L) <= 1000L) {
                inTimeMsgCount.addAndGet(msgs.size());
            }
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            msg.setDelayTimeSec(30L);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> finalSendSuccess >= 32);
        System.out.printf("send success%n", new Object[0]);
        ScheduleSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master1%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> receivedMsgCount.get() >= 32 && (double)inTimeMsgCount.get() >= 30.4);
        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
        pushConsumer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Add back master1%n", new Object[0]);
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        Thread.sleep(20000L);
    }

    @Test
    public void testRemoteActing_delayMsg() throws Exception {
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
        AtomicInteger master3MsgCount = new AtomicInteger(0);
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            msg.setDelayTimeLevel(4);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        long sendCompleteTimeStamp = System.currentTimeMillis();
        System.out.printf("send success%n", new Object[0]);
        DefaultMQPushConsumer pushConsumer = ScheduleSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(topic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            long period = System.currentTimeMillis() - sendCompleteTimeStamp;
            if (Math.abs(period - 30000L) <= 4000L) {
                inTimeMsgCount.addAndGet(msgs.size());
            }
            if (((MessageExt)msgs.get(0)).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                master3MsgCount.addAndGet(msgs.size());
            }
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf("cost " + period + " " + x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        ScheduleSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        BrokerIdentity master1BrokerIdentity = new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId());
        brokerContainer1.removeBroker(master1BrokerIdentity);
        System.out.printf("Remove master1%n", new Object[0]);
        ScheduleSlaveActingMasterIT.isolateBroker(master2With3Replicas);
        BrokerIdentity master2BrokerIdentity = new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId());
        brokerContainer2.removeBroker(master2BrokerIdentity);
        System.out.printf("Remove master2%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> receivedMsgCount.get() >= 32 && master3MsgCount.get() >= 32 && (double)inTimeMsgCount.get() >= 30.4);
        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
        pushConsumer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Add back master1%n", new Object[0]);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master2With3Replicas);
        System.out.printf("Add back master2%n", new Object[0]);
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        Thread.sleep(30000L);
    }

    @Test
    public void testRemoteActing_timerMsg() throws Exception {
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + this.random.nextInt(65535);
        this.createTopic(topic);
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
        AtomicInteger master3MsgCount = new AtomicInteger(0);
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            msg.setDelayTimeSec(30L);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        long sendCompleteTimeStamp = System.currentTimeMillis();
        System.out.printf("send success%n", new Object[0]);
        DefaultMQPushConsumer pushConsumer = ScheduleSlaveActingMasterIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(topic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            long period = System.currentTimeMillis() - sendCompleteTimeStamp;
            if (Math.abs(period - 30000L) <= 3000L) {
                inTimeMsgCount.addAndGet(msgs.size());
            }
            if (((MessageExt)msgs.get(0)).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                master3MsgCount.addAndGet(msgs.size());
            }
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf("cost " + period + " " + x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        ScheduleSlaveActingMasterIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master1%n", new Object[0]);
        ScheduleSlaveActingMasterIT.isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master2%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> receivedMsgCount.get() >= 32 && master3MsgCount.get() >= 32 && (double)inTimeMsgCount.get() >= 30.4);
        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
        pushConsumer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Add back master1%n", new Object[0]);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        ScheduleSlaveActingMasterIT.cancelIsolatedBroker(master2With3Replicas);
        System.out.printf("Add back master2%n", new Object[0]);
        ScheduleSlaveActingMasterIT.awaitUntilSlaveOK();
        Thread.sleep(20000L);
    }

    static {
        MESSAGE_STRING = RandomStringUtils.random((int)1024);
        MESSAGE_BODY = MESSAGE_STRING.getBytes(StandardCharsets.UTF_8);
    }
}

