/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class ScheduledMessageIT
extends ContainerIntegrationTestBase {
    private static DefaultMQProducer producer;
    private static final String CONSUME_GROUP;
    private static final String MESSAGE_STRING;
    private static final byte[] MESSAGE_BODY;
    private static final String TOPIC_PREFIX;
    private final Random random = new Random();
    private static final int MESSAGE_COUNT = 128;

    void createTopic(String topic) {
        ScheduledMessageIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        ScheduledMessageIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        ScheduledMessageIT.createTopicTo(master3With3Replicas, topic, 1, 1);
    }

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = ScheduledMessageIT.createProducer(ScheduledMessageIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Ignore
    @Test
    public void consumeScheduledMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String topic = TOPIC_PREFIX + this.random.nextInt(65535);
        this.createTopic(topic);
        DefaultMQPushConsumer pushConsumer = ScheduledMessageIT.createPushConsumer(CONSUME_GROUP + this.random.nextInt(65535));
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            long period = System.currentTimeMillis() - ((MessageExt)msgs.get(0)).getBornTimestamp();
            if (Math.abs(period - 5000L) <= 1000L) {
                inTimeMsgCount.addAndGet(msgs.size());
            }
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(receivedMsgCount.get() + " cost " + period + " " + x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        for (int i = 0; i < 128; ++i) {
            Message msg = new Message(topic, MESSAGE_BODY);
            msg.setDelayTimeLevel(2);
            producer.send(msg);
        }
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> receivedMsgCount.get() >= 128 && (double)inTimeMsgCount.get() >= 121.6);
        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
        pushConsumer.shutdown();
    }

    @Test
    public void consumeScheduledMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String topic = TOPIC_PREFIX + this.random.nextInt(65535);
        this.createTopic(topic);
        DefaultMQPushConsumer pushConsumer = ScheduledMessageIT.createPushConsumer(CONSUME_GROUP + this.random.nextInt(65535));
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            msgs.forEach(x -> System.out.printf(x + "%n", new Object[0]));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        for (int i = 0; i < 128; ++i) {
            Message msg = new Message(topic, String.valueOf(i).getBytes());
            msg.setDelayTimeLevel(2);
            producer.send(msg);
        }
        ScheduledMessageIT.isolateBroker(master1With3Replicas);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        Java6Assertions.assertThat((String)producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
        pushConsumer.start();
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> receivedMsgCount.get() >= 128);
        pushConsumer.shutdown();
        ScheduledMessageIT.cancelIsolatedBroker(master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
    }

    @Test
    public void consumeTimerMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String topic = TOPIC_PREFIX + this.random.nextInt(65535);
        this.createTopic(topic);
        DefaultMQPushConsumer pushConsumer = ScheduledMessageIT.createPushConsumer(CONSUME_GROUP);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        for (int i = 0; i < 128; ++i) {
            Message msg = new Message(topic, String.valueOf(i).getBytes());
            msg.setDelayTimeSec(3L);
            producer.send(msg);
        }
        ScheduledMessageIT.isolateBroker(master1With3Replicas);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        Java6Assertions.assertThat((String)producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
        pushConsumer.start();
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> receivedMsgCount.get() >= 128);
        pushConsumer.shutdown();
        ScheduledMessageIT.cancelIsolatedBroker(master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
    }

    static {
        CONSUME_GROUP = ScheduledMessageIT.class.getSimpleName() + "_Consumer";
        MESSAGE_STRING = RandomStringUtils.random((int)1024);
        MESSAGE_BODY = MESSAGE_STRING.getBytes(StandardCharsets.UTF_8);
        TOPIC_PREFIX = ScheduledMessageIT.class.getSimpleName() + "_TOPIC";
    }
}

