/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class SendMultipleReplicasIT
extends ContainerIntegrationTestBase {
    private static DefaultMQProducer mqProducer;
    private static final String MSG = "Hello RocketMQ ";
    private static final byte[] MESSAGE_BODY;

    @BeforeClass
    public static void beforeClass() throws Exception {
        mqProducer = SendMultipleReplicasIT.createProducer("SendMultipleReplicasMessageIT_Producer");
        mqProducer.setSendMsgTimeout(15000);
        mqProducer.start();
    }

    @AfterClass
    public static void afterClass() {
        if (mqProducer != null) {
            mqProducer.shutdown();
        }
    }

    @Test
    public void sendMessageToBrokerGroup() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        SendMultipleReplicasIT.awaitUntilSlaveOK();
        Message msg = new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", MESSAGE_BODY);
        SendResult sendResult = mqProducer.send(msg);
        Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
    }

    @Test
    public void sendMessage_Auto_Replicas_Success() throws Exception {
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
        Message msg = new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", MESSAGE_BODY);
        SendResult sendResult = mqProducer.send(msg);
        Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
        SendMultipleReplicasIT.removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
        SendMultipleReplicasIT.removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
        master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(true);
        List mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        MessageQueue targetMq = null;
        for (MessageQueue mq : mqList) {
            if (!mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) continue;
            targetMq = mq;
        }
        Java6Assertions.assertThat(targetMq).isNotNull();
        msg = new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", MESSAGE_BODY);
        sendResult = mqProducer.send(msg, targetMq);
        Java6Assertions.assertThat((Comparable)sendResult.getSendStatus()).isEqualTo((Object)SendStatus.SEND_OK);
        SendMultipleReplicasIT.createAndAddSlave(1, brokerContainer2, master1With3Replicas);
        SendMultipleReplicasIT.createAndAddSlave(2, brokerContainer3, master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
    }

    @Test
    public void sendMessage_Auto_Replicas_Failed() throws Exception {
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
        SendMultipleReplicasIT.removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
        SendMultipleReplicasIT.removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
        master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(false);
        List mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS");
        MessageQueue targetMq = null;
        for (MessageQueue mq : mqList) {
            if (!mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) continue;
            targetMq = mq;
        }
        Java6Assertions.assertThat(targetMq).isNotNull();
        Message msg = new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", MESSAGE_BODY);
        boolean exceptionCaught = false;
        try {
            mqProducer.send(msg, targetMq);
        }
        catch (MQBrokerException e) {
            exceptionCaught = true;
        }
        Java6Assertions.assertThat((boolean)exceptionCaught).isTrue();
        SendMultipleReplicasIT.createAndAddSlave(1, brokerContainer2, master1With3Replicas);
        SendMultipleReplicasIT.createAndAddSlave(2, brokerContainer3, master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> ((DefaultMessageStore)master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2 && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
    }

    static {
        MESSAGE_BODY = MSG.getBytes(StandardCharsets.UTF_8);
    }
}

