/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class PullMultipleReplicasIT
extends ContainerIntegrationTestBase {
    private static DefaultMQPullConsumer pullConsumer;
    private static DefaultMQProducer producer;
    private static MQClientInstance mqClientInstance;
    private static final String MESSAGE_STRING;
    private static final byte[] MESSAGE_BODY;

    @BeforeClass
    public static void beforeClass() throws Exception {
        pullConsumer = PullMultipleReplicasIT.createPullConsumer(PullMultipleReplicasIT.class.getSimpleName() + "_Consumer");
        pullConsumer.start();
        Field field = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");
        field.setAccessible(true);
        mqClientInstance = (MQClientInstance)field.get(pullConsumer.getDefaultMQPullConsumerImpl());
        producer = PullMultipleReplicasIT.createProducer(PullMultipleReplicasIT.class.getSimpleName() + "_Producer");
        producer.setSendMsgTimeout(15000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() {
        producer.shutdown();
        pullConsumer.shutdown();
    }

    @Test
    public void testPullMessageFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        PullMultipleReplicasIT.awaitUntilSlaveOK();
        Message msg = new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", MESSAGE_BODY);
        SendResult sendResult = producer.send(msg);
        Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
        MessageQueue messageQueue = sendResult.getMessageQueue();
        long queueOffset = sendResult.getQueueOffset();
        PullResult[] pullResult = new PullResult[]{null};
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return pullResult[0].getPullStatus() == PullStatus.FOUND;
        });
        List msgFoundList = pullResult[0].getMsgFoundList();
        Java6Assertions.assertThat((int)msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat((String)new String(((MessageExt)msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 1L);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return pullResult[0].getPullStatus() == PullStatus.FOUND;
        });
        msgFoundList = pullResult[0].getMsgFoundList();
        Java6Assertions.assertThat((int)msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat((String)new String(((MessageExt)msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 2L);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return pullResult[0].getPullStatus() == PullStatus.FOUND;
        });
        msgFoundList = pullResult[0].getMsgFoundList();
        Java6Assertions.assertThat((int)msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat((String)new String(((MessageExt)msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 0L);
    }

    @Test
    public void testSendMessageBackToSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        PullMultipleReplicasIT.awaitUntilSlaveOK();
        String clusterTopic = "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK";
        PullMultipleReplicasIT.createTopicTo(master1With3Replicas, clusterTopic);
        PullMultipleReplicasIT.createTopicTo(master3With3Replicas, clusterTopic);
        Message msg = new Message(clusterTopic, MESSAGE_BODY);
        producer.setSendMsgTimeout(10000);
        MessageQueue[] selectedQueue = new MessageQueue[1];
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            for (MessageQueue queue : producer.fetchPublishMessageQueues(clusterTopic)) {
                if (!queue.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) continue;
                selectedQueue[0] = queue;
            }
            return selectedQueue[0] != null;
        });
        SendResult sendResult = producer.send(msg, selectedQueue[0]);
        Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
        MessageQueue messageQueue = sendResult.getMessageQueue();
        long queueOffset = sendResult.getQueueOffset();
        PullResult[] pullResult = new PullResult[]{null};
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return pullResult[0].getPullStatus() == PullStatus.FOUND;
        });
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            DefaultMessageStore messageStore = (DefaultMessageStore)master3With3Replicas.getMessageStore();
            return messageStore.getHaService().inSyncReplicasNums(messageStore.getMaxPhyOffset()) == 3;
        });
        InnerSalveBrokerController slaveBroker = null;
        for (InnerSalveBrokerController slave : brokerContainer1.getSlaveBrokers()) {
            if (!slave.getBrokerConfig().getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) continue;
            slaveBroker = slave;
        }
        Java6Assertions.assertThat(slaveBroker).isNotNull();
        MessageExt backMessage = (MessageExt)pullResult[0].getMsgFoundList().get(0);
        backMessage.setStoreHost((SocketAddress)new InetSocketAddress(slaveBroker.getBrokerConfig().getBrokerIP1(), slaveBroker.getBrokerConfig().getListenPort()));
        pullConsumer.sendMessageBack(backMessage, 0);
        String retryTopic = MixAll.getRetryTopic((String)pullConsumer.getConsumerGroup());
        MessageQueue newMsgQueue = new MessageQueue(retryTopic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            pullResult[0] = pullConsumer.pull(newMsgQueue, "*", 0L, 1);
            return pullResult[0].getPullStatus() == PullStatus.FOUND;
        });
        List msgFoundList = pullResult[0].getMsgFoundList();
        Java6Assertions.assertThat((int)msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat((String)new String(((MessageExt)msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(MESSAGE_STRING);
        PullMultipleReplicasIT.awaitUntilSlaveOK();
    }

    static {
        MESSAGE_STRING = RandomStringUtils.random((int)1024);
        MESSAGE_BODY = MESSAGE_STRING.getBytes(StandardCharsets.UTF_8);
    }
}

