/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.time.Duration;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.apache.rocketmq.test.container.PushMultipleReplicasIT;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class GetMetadataReverseIT
extends ContainerIntegrationTestBase {
    private static DefaultMQProducer producer;
    private static final String CONSUMER_GROUP;
    private static final int MESSAGE_COUNT = 32;
    private final Random random = new Random();

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = GetMetadataReverseIT.createProducer(PushMultipleReplicasIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(15000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Test
    public void testGetMetadataReverse_consumerOffset() throws Exception {
        String topic = GetMetadataReverseIT.class.getSimpleName() + "_consumerOffset" + this.random.nextInt(65535);
        GetMetadataReverseIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            InnerSalveBrokerController slaveBroker = (InnerSalveBrokerController)brokerContainer2.getSlaveBrokers().iterator().next();
            return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
        });
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, Integer.toString(i).getBytes());
            SendResult sendResult = producer.send(msg);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        GetMetadataReverseIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        DefaultMQPushConsumer pushConsumer = GetMetadataReverseIT.createPushConsumer(CONSUMER_GROUP);
        pushConsumer.subscribe(topic, "*");
        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> receivedMsgCount.get() >= 32);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
            Map slaveOffsetTable = null;
            for (InnerSalveBrokerController slave : brokerContainer2.getSlaveBrokers()) {
                if (!slave.getBrokerConfig().getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) continue;
                slaveOffsetTable = slave.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
            }
            if (slaveOffsetTable != null) {
                long totalOffset = 0L;
                for (Long offset : slaveOffsetTable.values()) {
                    totalOffset += offset.longValue();
                }
                return totalOffset >= 32L;
            }
            return false;
        });
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        GetMetadataReverseIT.cancelIsolatedBroker(master1With3Replicas);
        GetMetadataReverseIT.awaitUntilSlaveOK();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            Map offsetTable = master1With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
            long totalOffset = 0L;
            if (offsetTable != null) {
                for (Long offset : offsetTable.values()) {
                    totalOffset += offset.longValue();
                }
            }
            return totalOffset >= 32L;
        });
        pushConsumer.shutdown();
    }

    @Test
    public void testGetMetadataReverse_delayOffset() throws Exception {
        String topic = GetMetadataReverseIT.class.getSimpleName() + "_delayOffset" + this.random.nextInt(65535);
        GetMetadataReverseIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        GetMetadataReverseIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        GetMetadataReverseIT.createTopicTo(master3With3Replicas, topic, 1, 1);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            InnerSalveBrokerController slaveBroker = (InnerSalveBrokerController)brokerContainer2.getSlaveBrokers().iterator().next();
            return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
        });
        int delayLevel = 4;
        DefaultMQPushConsumer pushConsumer = GetMetadataReverseIT.createPushConsumer(CONSUMER_GROUP);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, Integer.toString(i).getBytes());
            msg.setDelayTimeLevel(delayLevel);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        GetMetadataReverseIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> receivedMsgCount.get() >= 32);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
            Map offsetTable = master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
            if (offsetTable != null) {
                long totalOffset = 0L;
                for (Long offset : offsetTable.values()) {
                    totalOffset += offset.longValue();
                }
                return totalOffset >= 32L;
            }
            return false;
        });
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        GetMetadataReverseIT.cancelIsolatedBroker(master1With3Replicas);
        GetMetadataReverseIT.awaitUntilSlaveOK();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            ConcurrentMap offsetTable = master1With3Replicas.getScheduleMessageService().getOffsetTable();
            return (Long)offsetTable.get(delayLevel) >= 32L;
        });
        pushConsumer.shutdown();
    }

    @Test
    public void testGetMetadataReverse_timerCheckPoint() throws Exception {
        String topic = GetMetadataReverseIT.class.getSimpleName() + "_timerCheckPoint" + this.random.nextInt(65535);
        GetMetadataReverseIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        GetMetadataReverseIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        GetMetadataReverseIT.createTopicTo(master3With3Replicas, topic, 1, 1);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            InnerSalveBrokerController slaveBroker = (InnerSalveBrokerController)brokerContainer2.getSlaveBrokers().iterator().next();
            return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
        });
        DefaultMQPushConsumer pushConsumer = GetMetadataReverseIT.createPushConsumer(CONSUMER_GROUP);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int sendSuccess = 0;
        for (int i = 0; i < 32; ++i) {
            Message msg = new Message(topic, Integer.toString(i).getBytes());
            msg.setDelayTimeSec(30L);
            SendResult sendResult = producer.send(msg, messageQueue);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) continue;
            ++sendSuccess;
        }
        int finalSendSuccess = sendSuccess;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> finalSendSuccess >= 32);
        GetMetadataReverseIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerConfig().getBrokerClusterName(), master1With3Replicas.getBrokerConfig().getBrokerName(), master1With3Replicas.getBrokerConfig().getBrokerId()));
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> receivedMsgCount.get() >= 32);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
            Map offsetTable = master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
            if (offsetTable != null) {
                long totalOffset = 0L;
                for (Long offset : offsetTable.values()) {
                    totalOffset += offset.longValue();
                }
                return totalOffset >= 32L;
            }
            return false;
        });
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        GetMetadataReverseIT.cancelIsolatedBroker(master1With3Replicas);
        GetMetadataReverseIT.awaitUntilSlaveOK();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >= 32L);
        pushConsumer.shutdown();
    }

    static {
        CONSUMER_GROUP = GetMetadataReverseIT.class.getSimpleName() + "_Consumer";
    }
}

