/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.container.ContainerIntegrationTestBase;
import org.apache.rocketmq.test.container.TransactionListenerImpl;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class TransactionMessageIT
extends ContainerIntegrationTestBase {
    private static final String MESSAGE_STRING = RandomStringUtils.random((int)1024);
    private static byte[] messageBody;
    private static final int MESSAGE_COUNT = 16;

    private static String generateGroup() {
        return "GID-" + TransactionMessageIT.class.getSimpleName() + RandomStringUtils.randomNumeric((int)5);
    }

    @Test
    public void consumeTransactionMsg() throws MQClientException {
        String topic = TransactionMessageIT.generateTopic();
        TransactionMessageIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        String group = TransactionMessageIT.generateGroup();
        DefaultMQPushConsumer pushConsumer = TransactionMessageIT.createPushConsumer(group);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        pushConsumer.registerMessageListener((msgs, context) -> {
            receivedMsgCount.addAndGet(msgs.size());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        TransactionMQProducer producer = TransactionMessageIT.createTransactionProducer(group, new TransactionListenerImpl(false));
        producer.start();
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, messageBody);
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
            Assertions.assertThat((Comparable)result.getLocalTransactionState()).isEqualTo((Object)LocalTransactionState.COMMIT_MESSAGE);
        }
        System.out.printf("send message complete%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofSeconds(32L)).until(() -> receivedMsgCount.get() >= 16);
        System.out.printf("consumer received %d msg%n", receivedMsgCount.get());
        pushConsumer.shutdown();
        producer.shutdown();
    }

    private static String generateTopic() {
        return TransactionMessageIT.class.getSimpleName() + RandomStringUtils.randomNumeric((int)5);
    }

    @Test
    public void consumeTransactionMsgLocalEscape() throws Exception {
        String topic = TransactionMessageIT.generateTopic();
        TransactionMessageIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        String group = TransactionMessageIT.generateGroup();
        DefaultMQPushConsumer pushConsumer = TransactionMessageIT.createPushConsumer(group);
        pushConsumer.subscribe(topic, "*");
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        HashMap<String, Message> msgSentMap = new HashMap<String, Message>();
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                if (!msgSentMap.containsKey(msg.getMsgId())) continue;
                receivedMsgCount.incrementAndGet();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        TransactionListenerImpl transactionCheckListener = new TransactionListenerImpl(true);
        TransactionMQProducer producer = TransactionMessageIT.createTransactionProducer(group, transactionCheckListener);
        producer.start();
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, messageBody);
            msg.setKeys(UUID.randomUUID().toString());
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
            String msgId = result.getMsgId();
            msgSentMap.put(msgId, msg);
        }
        TransactionMessageIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(), master1With3Replicas.getBrokerIdentity().getBrokerName(), master1With3Replicas.getBrokerIdentity().getBrokerId()));
        System.out.printf("=========" + master1With3Replicas.getBrokerIdentity().getBrokerName() + "-" + master1With3Replicas.getBrokerIdentity().getBrokerId() + " removed%n", new Object[0]);
        TransactionMessageIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        transactionCheckListener.setShouldReturnUnknownState(false);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        System.out.printf("Wait for consuming%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofSeconds(300L)).until(() -> receivedMsgCount.get() >= 16);
        System.out.printf("consumer received %d msg%n", receivedMsgCount.get());
        pushConsumer.shutdown();
        producer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        TransactionMessageIT.cancelIsolatedBroker(master1With3Replicas);
        TransactionMessageIT.awaitUntilSlaveOK();
        receivedMsgCount.set(0);
        DefaultMQPushConsumer pushConsumer2 = TransactionMessageIT.createPushConsumer(group);
        pushConsumer2.subscribe(topic, "*");
        pushConsumer2.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                if (!msgSentMap.containsKey(msg.getMsgId())) continue;
                receivedMsgCount.incrementAndGet();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer2.start();
        System.out.printf("Wait for checking...%n", new Object[0]);
        Thread.sleep(10000L);
    }

    @Test
    public void consumeTransactionMsgRemoteEscape() throws Exception {
        String topic = TransactionMessageIT.generateTopic();
        TransactionMessageIT.createTopicTo(master1With3Replicas, topic, 1, 1);
        String group = TransactionMessageIT.generateGroup();
        AtomicInteger receivedMsgCount = new AtomicInteger(0);
        HashMap<String, Message> msgSentMap = new HashMap<String, Message>();
        DefaultMQPushConsumer pushConsumer = TransactionMessageIT.createPushConsumer(group);
        pushConsumer.subscribe(topic, "*");
        pushConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                if (!msgSentMap.containsKey(msg.getMsgId())) continue;
                receivedMsgCount.incrementAndGet();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.start();
        TransactionListenerImpl transactionCheckListener = new TransactionListenerImpl(true);
        TransactionMQProducer producer = TransactionMessageIT.createTransactionProducer(group, transactionCheckListener);
        producer.start();
        for (int i = 0; i < 16; ++i) {
            Message msg = new Message(topic, messageBody);
            msg.setKeys(UUID.randomUUID().toString());
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
            String msgId = result.getMsgId();
            msgSentMap.put(msgId, msg);
        }
        TransactionMessageIT.isolateBroker(master1With3Replicas);
        brokerContainer1.removeBroker(new BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(), master1With3Replicas.getBrokerIdentity().getBrokerName(), master1With3Replicas.getBrokerIdentity().getBrokerId()));
        System.out.printf("=========" + master1With3Replicas.getBrokerIdentity().getBrokerName() + "-" + master1With3Replicas.getBrokerIdentity().getBrokerId() + " removed%n", new Object[0]);
        TransactionMessageIT.createTopicTo(master2With3Replicas, topic, 1, 1);
        TransactionMessageIT.createTopicTo(master3With3Replicas, topic, 1, 1);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerIdentity().getBrokerClusterName(), master2With3Replicas.getBrokerIdentity().getBrokerName(), master2With3Replicas.getBrokerIdentity().getBrokerId()));
        System.out.printf("=========" + master2With3Replicas.getBrokerIdentity().getBrokerClusterName() + "-" + master2With3Replicas.getBrokerIdentity().getBrokerName() + "-" + master2With3Replicas.getBrokerIdentity().getBrokerId() + " removed%n", new Object[0]);
        pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().doRebalance(false);
        transactionCheckListener.setShouldReturnUnknownState(false);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
        System.out.printf("Wait for consuming%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofSeconds(180L)).until(() -> receivedMsgCount.get() >= 16);
        System.out.printf("consumer received %d msg%n", receivedMsgCount.get());
        pushConsumer.shutdown();
        producer.shutdown();
        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
        master1With3Replicas.start();
        TransactionMessageIT.cancelIsolatedBroker(master1With3Replicas);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        TransactionMessageIT.cancelIsolatedBroker(master2With3Replicas);
        TransactionMessageIT.awaitUntilSlaveOK();
        receivedMsgCount.set(0);
        DefaultMQPushConsumer pushConsumer2 = TransactionMessageIT.createPushConsumer(group);
        pushConsumer2.subscribe(topic, "*");
        pushConsumer2.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                if (!msgSentMap.containsKey(msg.getMsgId())) continue;
                receivedMsgCount.incrementAndGet();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer2.start();
        System.out.printf("Wait for checking...%n", new Object[0]);
        Thread.sleep(10000L);
        Assertions.assertThat((int)receivedMsgCount.get()).isEqualTo(0);
        pushConsumer2.shutdown();
    }

    static {
        try {
            messageBody = MESSAGE_STRING.getBytes("UTF-8");
        }
        catch (UnsupportedEncodingException unsupportedEncodingException) {
            // empty catch block
        }
    }
}

