/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.grpc.v2;

import apache.rocketmq.v2.AckMessageEntry;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.AckMessageResultEntry;
import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Encoding;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.MessagingServiceGrpc;
import apache.rocketmq.v2.Publishing;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.RetryPolicy;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SystemProperties;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.TransactionResolution;
import apache.rocketmq.v2.TransactionSource;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.net.URL;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.RandomUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;

public class GrpcBaseIT
extends BaseConf {
    private int port = 0;
    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
    protected MessagingServiceGrpc.MessagingServiceBlockingStub blockingStub;
    protected MessagingServiceGrpc.MessagingServiceStub stub;
    protected final Metadata header = new Metadata();
    protected static final int DEFAULT_QUEUE_NUMS = 8;

    public void setUp() throws Exception {
        brokerController1.getBrokerConfig().setTransactionCheckInterval(3000L);
        brokerController2.getBrokerConfig().setTransactionCheckInterval(3000L);
        brokerController3.getBrokerConfig().setTransactionCheckInterval(3000L);
        this.header.put(InterceptorConstants.CLIENT_ID, (Object)("client-id" + UUID.randomUUID()));
        this.header.put(InterceptorConstants.LANGUAGE, (Object)"JAVA");
        String mockProxyHome = "/mock/rmq/proxy/home";
        URL mockProxyHomeURL = this.getClass().getClassLoader().getResource("rmq-proxy-home");
        if (mockProxyHomeURL != null) {
            mockProxyHome = mockProxyHomeURL.toURI().getPath();
        }
        if (null != mockProxyHome) {
            System.setProperty("RMQ_PROXY_HOME", mockProxyHome);
        }
        ConfigurationManager.initEnv();
        ConfigurationManager.intConfig();
        ConfigurationManager.getProxyConfig().setNamesrvAddr(NAMESRV_ADDR);
        ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500L);
        ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
        ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
        ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3L);
        ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0L);
    }

    protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel channel) {
        MessagingServiceGrpc.MessagingServiceStub stub = MessagingServiceGrpc.newStub((Channel)channel);
        return (MessagingServiceGrpc.MessagingServiceStub)stub.withInterceptors(new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor((Metadata)this.header)});
    }

    protected MessagingServiceGrpc.MessagingServiceBlockingStub createBlockingStub(Channel channel) {
        MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc.newBlockingStub((Channel)channel);
        return (MessagingServiceGrpc.MessagingServiceBlockingStub)stub.withInterceptors(new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor((Metadata)this.header)});
    }

    protected CompletableFuture<Settings> sendClientSettings(MessagingServiceGrpc.MessagingServiceStub stub, Settings clientSettings) {
        final CompletableFuture<Settings> future = new CompletableFuture<Settings>();
        StreamObserver requestStreamObserver = stub.telemetry((StreamObserver)new DefaultTelemetryCommandStreamObserver(){

            @Override
            public void onNext(TelemetryCommand value) {
                TelemetryCommand.CommandCase commandCase = value.getCommandCase();
                if (TelemetryCommand.CommandCase.SETTINGS.equals((Object)commandCase)) {
                    future.complete(value.getSettings());
                }
            }
        });
        requestStreamObserver.onNext((Object)TelemetryCommand.newBuilder().setSettings(clientSettings).build());
        future.whenComplete((settings, throwable) -> requestStreamObserver.onCompleted());
        return future;
    }

    protected void setUpServer(MessagingServiceGrpc.MessagingServiceImplBase serverImpl, int port, boolean enableInterceptor) throws IOException, CertificateException {
        SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
        ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept((BindableService)serverImpl, (ServerInterceptor[])new ServerInterceptor[0]);
        if (enableInterceptor) {
            serviceDefinition = ServerInterceptors.intercept((BindableService)serverImpl, (ServerInterceptor[])new ServerInterceptor[]{new ContextInterceptor(), new HeaderInterceptor()});
        }
        Server server = ((NettyServerBuilder)((NettyServerBuilder)NettyServerBuilder.forPort((int)port).directExecutor()).addService(serviceDefinition)).useTransportSecurity(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build().start();
        this.port = server.getPort();
        this.grpcCleanup.register(server);
        ConfigurationManager.getProxyConfig().setGrpcServerPort(Integer.valueOf(this.port));
        this.blockingStub = this.createBlockingStub(this.createChannel(ConfigurationManager.getProxyConfig().getGrpcServerPort()));
        this.stub = this.createStub(this.createChannel(ConfigurationManager.getProxyConfig().getGrpcServerPort()));
    }

    protected Channel createChannel(int port) throws SSLException {
        return this.grpcCleanup.register(((NettyChannelBuilder)NettyChannelBuilder.forAddress((String)"127.0.0.1", (int)port).directExecutor()).sslContext(SslContextBuilder.forClient().sslProvider(SslProvider.OPENSSL).trustManager(InsecureTrustManagerFactory.INSTANCE).applicationProtocolConfig(new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, new String[]{"h2"})).build()).build());
    }

    public void testQueryAssignment() throws Exception {
        String topic = GrpcBaseIT.initTopic();
        String group = "group";
        QueryAssignmentResponse response = this.blockingStub.queryAssignment(this.buildQueryAssignmentRequest(topic, group));
        this.assertQueryAssignment(response, 3);
    }

    public void testQueryFifoAssignment() throws Exception {
        String topic = GrpcBaseIT.initTopic(TopicMessageType.FIFO);
        String group = MQRandomUtils.getRandomConsumerGroup();
        SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
        groupConfig.setConsumeMessageOrderly(true);
        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        QueryAssignmentResponse response = this.blockingStub.queryAssignment(this.buildQueryAssignmentRequest(topic, group));
        this.assertQueryAssignment(response, 24);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testTransactionCheckThenCommit() {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.TRANSACTION);
        String group = MQRandomUtils.getRandomConsumerGroup();
        final AtomicReference<Object> telemetryCommandRef = new AtomicReference<Object>(null);
        StreamObserver requestStreamObserver = this.stub.telemetry((StreamObserver)new DefaultTelemetryCommandStreamObserver(){

            @Override
            public void onNext(TelemetryCommand value) {
                telemetryCommandRef.set(value);
            }
        });
        try {
            requestStreamObserver.onNext((Object)TelemetryCommand.newBuilder().setSettings(this.buildPushConsumerClientSettings(group)).build());
            Awaitility.await().atMost(java.time.Duration.ofSeconds(3L)).until(() -> {
                if (telemetryCommandRef.get() == null) {
                    return false;
                }
                if (((TelemetryCommand)telemetryCommandRef.get()).getCommandCase() != TelemetryCommand.CommandCase.SETTINGS) {
                    return false;
                }
                return telemetryCommandRef.get() != null;
            });
            telemetryCommandRef.set(null);
            this.receiveMessage(this.blockingStub, topic, group, 1);
            requestStreamObserver.onNext((Object)TelemetryCommand.newBuilder().setSettings(this.buildProducerClientSettings(topic)).build());
            this.blockingStub.heartbeat(this.buildHeartbeatRequest(group));
            Awaitility.await().atMost(java.time.Duration.ofSeconds(3L)).until(() -> {
                if (telemetryCommandRef.get() == null) {
                    this.blockingStub.heartbeat(this.buildHeartbeatRequest(group));
                    return false;
                }
                if (((TelemetryCommand)telemetryCommandRef.get()).getCommandCase() != TelemetryCommand.CommandCase.SETTINGS) {
                    this.blockingStub.heartbeat(this.buildHeartbeatRequest(group));
                    return false;
                }
                return telemetryCommandRef.get() != null;
            });
            telemetryCommandRef.set(null);
            String messageId = MessageClientIDSetter.createUniqID();
            SendMessageResponse sendResponse = this.blockingStub.sendMessage(this.buildTransactionSendMessageRequest(topic, messageId));
            this.assertSendMessage(sendResponse, messageId);
            Awaitility.await().atMost(java.time.Duration.ofMinutes(2L)).until(() -> {
                if (telemetryCommandRef.get() == null) {
                    this.blockingStub.heartbeat(this.buildHeartbeatRequest(group));
                    return false;
                }
                if (((TelemetryCommand)telemetryCommandRef.get()).getCommandCase() != TelemetryCommand.CommandCase.RECOVER_ORPHANED_TRANSACTION_COMMAND) {
                    this.blockingStub.heartbeat(this.buildHeartbeatRequest(group));
                    return false;
                }
                return telemetryCommandRef.get() != null;
            });
            RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand = ((TelemetryCommand)telemetryCommandRef.get()).getRecoverOrphanedTransactionCommand();
            this.assertRecoverOrphanedTransactionCommand(recoverOrphanedTransactionCommand, messageId);
            EndTransactionResponse endTransactionResponse = this.blockingStub.endTransaction(this.buildEndTransactionRequest(topic, messageId, recoverOrphanedTransactionCommand.getTransactionId(), TransactionResolution.COMMIT));
            this.assertEndTransactionResponse(endTransactionResponse);
            requestStreamObserver.onNext((Object)TelemetryCommand.newBuilder().setSettings(this.buildPushConsumerClientSettings(group)).build());
            Awaitility.await().atMost(java.time.Duration.ofSeconds(30L)).until(() -> {
                List<Message> retryMessageList = this.getMessageFromReceiveMessageResponse(this.receiveMessage(this.blockingStub, topic, group));
                if (retryMessageList.isEmpty()) {
                    return false;
                }
                return retryMessageList.get(0).getSystemProperties().getMessageId().equals(messageId);
            });
        }
        finally {
            requestStreamObserver.onCompleted();
        }
    }

    public HeartbeatRequest buildHeartbeatRequest(String group) {
        return HeartbeatRequest.newBuilder().setGroup(Resource.newBuilder().setName(group).build()).build();
    }

    public void testSimpleConsumerSendAndRecvDelayMessage() throws Exception {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.DELAY);
        String group = MQRandomUtils.getRandomConsumerGroup();
        long delayTime = TimeUnit.SECONDS.toMillis(5L);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        this.receiveMessage(this.blockingStub, topic, group, 1);
        this.sendClientSettings(this.stub, this.buildProducerClientSettings(topic)).get();
        String messageId = MessageClientIDSetter.createUniqID();
        SendMessageResponse sendResponse = this.blockingStub.sendMessage(SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(messageId).setQueueId(0).setMessageType(MessageType.NORMAL).setBodyEncoding(Encoding.GZIP).setBornTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis())).setBornHost(StringUtils.defaultString((String)NetworkUtil.getLocalAddress(), (String)"127.0.0.1:1234")).setDeliveryTimestamp(Timestamps.fromMillis((long)(System.currentTimeMillis() + delayTime))).build()).setBody(ByteString.copyFromUtf8((String)"hello")).build()).build());
        long sendTime = System.currentTimeMillis();
        this.assertSendMessage(sendResponse, messageId);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        AtomicLong recvTime = new AtomicLong();
        AtomicReference recvMessage = new AtomicReference();
        Awaitility.await().atMost(java.time.Duration.ofSeconds(10L)).until(() -> {
            List<Message> messageList = this.getMessageFromReceiveMessageResponse(this.receiveMessage(this.blockingStub, topic, group));
            if (messageList.isEmpty()) {
                return false;
            }
            recvTime.set(System.currentTimeMillis());
            recvMessage.set(messageList.get(0));
            return messageList.get(0).getSystemProperties().getMessageId().equals(messageId);
        });
        Assertions.assertThat((Math.abs(recvTime.get() - sendTime - delayTime) < 2000L ? 1 : 0) != 0).isTrue();
    }

    public void testSimpleConsumerSendAndRecvBigMessage() throws Exception {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME);
        String group = MQRandomUtils.getRandomConsumerGroup();
        int bodySize = 4096;
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        this.receiveMessage(this.blockingStub, topic, group, 1);
        this.sendClientSettings(this.stub, this.buildProducerClientSettings(topic)).get();
        String messageId = MessageClientIDSetter.createUniqID();
        SendMessageResponse sendResponse = this.blockingStub.sendMessage(this.buildSendBigMessageRequest(topic, messageId, bodySize));
        this.assertSendMessage(sendResponse, messageId);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        Message message = this.assertAndGetReceiveMessage(this.receiveMessage(this.blockingStub, topic, group), messageId);
        Assertions.assertThat((Comparable)message.getSystemProperties().getBodyEncoding()).isEqualTo((Object)Encoding.GZIP);
        Assertions.assertThat((int)message.getBody().size()).isEqualTo(bodySize);
    }

    public void testSimpleConsumerSendAndRecv() throws Exception {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME);
        String group = MQRandomUtils.getRandomConsumerGroup();
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        this.receiveMessage(this.blockingStub, topic, group, 1);
        this.sendClientSettings(this.stub, this.buildProducerClientSettings(topic)).get();
        String messageId = MessageClientIDSetter.createUniqID();
        SendMessageResponse sendResponse = this.blockingStub.sendMessage(this.buildSendMessageRequest(topic, messageId));
        this.assertSendMessage(sendResponse, messageId);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        Message message = this.assertAndGetReceiveMessage(this.receiveMessage(this.blockingStub, topic, group), messageId);
        String receiptHandle = message.getSystemProperties().getReceiptHandle();
        ChangeInvisibleDurationResponse changeResponse = this.blockingStub.changeInvisibleDuration(this.buildChangeInvisibleDurationRequest(topic, group, receiptHandle, 5));
        this.assertChangeInvisibleDurationResponse(changeResponse, receiptHandle);
        ArrayList<String> ackHandles = new ArrayList<String>();
        ackHandles.add(changeResponse.getReceiptHandle());
        Awaitility.await().atMost(java.time.Duration.ofSeconds(20L)).until(() -> {
            List<Message> retryMessageList = this.getMessageFromReceiveMessageResponse(this.receiveMessage(this.blockingStub, topic, group));
            if (retryMessageList.isEmpty()) {
                return false;
            }
            if (retryMessageList.get(0).getSystemProperties().getMessageId().equals(messageId)) {
                ackHandles.add(retryMessageList.get(0).getSystemProperties().getReceiptHandle());
                return true;
            }
            return false;
        });
        Assertions.assertThat((int)ackHandles.size()).isEqualTo(2);
        AckMessageResponse ackMessageResponse = this.blockingStub.ackMessage(this.buildAckMessageRequest(topic, group, AckMessageEntry.newBuilder().setMessageId(messageId).setReceiptHandle((String)ackHandles.get(0)).build(), AckMessageEntry.newBuilder().setMessageId(messageId).setReceiptHandle((String)ackHandles.get(1)).build()));
        Assertions.assertThat((Comparable)ackMessageResponse.getStatus().getCode()).isEqualTo((Object)Code.MULTIPLE_RESULTS);
        int okNum = 0;
        int expireNum = 0;
        for (AckMessageResultEntry entry : ackMessageResponse.getEntriesList()) {
            if (entry.getStatus().getCode().equals((Object)Code.OK)) {
                ++okNum;
                continue;
            }
            if (!entry.getStatus().getCode().equals((Object)Code.INVALID_RECEIPT_HANDLE)) continue;
            ++expireNum;
        }
        Assertions.assertThat((int)okNum).isEqualTo(1);
        Assertions.assertThat((int)expireNum).isEqualTo(1);
    }

    public void testSimpleConsumerToDLQ() throws Exception {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME);
        String group = MQRandomUtils.getRandomConsumerGroup();
        int maxDeliveryAttempts = 2;
        SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
        groupConfig.setRetryMaxTimes(maxDeliveryAttempts - 1);
        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        this.receiveMessage(this.blockingStub, topic, group, 1);
        this.sendClientSettings(this.stub, this.buildProducerClientSettings(topic)).get();
        String messageId = MessageClientIDSetter.createUniqID();
        SendMessageResponse sendResponse = this.blockingStub.sendMessage(this.buildSendMessageRequest(topic, messageId));
        this.assertSendMessage(sendResponse, messageId);
        this.sendClientSettings(this.stub, this.buildSimpleConsumerClientSettings(group)).get();
        AtomicInteger receiveMessageCount = new AtomicInteger(0);
        this.assertAndGetReceiveMessage(this.receiveMessage(this.blockingStub, topic, group), messageId);
        receiveMessageCount.incrementAndGet();
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(group);
        defaultMQPullConsumer.start();
        org.apache.rocketmq.common.message.MessageQueue dlqMQ = new org.apache.rocketmq.common.message.MessageQueue(MixAll.getDLQTopic((String)group), BROKER1_NAME, 0);
        Awaitility.await().atMost(java.time.Duration.ofSeconds(30L)).until(() -> {
            try {
                List<Message> messageList = this.getMessageFromReceiveMessageResponse(this.receiveMessage(this.blockingStub, topic, group, 1));
                receiveMessageCount.addAndGet(messageList.size());
                PullResult pullResult = defaultMQPullConsumer.pull(dlqMQ, "*", 0L, 1);
                if (!PullStatus.FOUND.equals((Object)pullResult.getPullStatus())) {
                    return false;
                }
                MessageExt messageExt = (MessageExt)pullResult.getMsgFoundList().get(0);
                return messageId.equals(messageExt.getMsgId());
            }
            catch (Throwable ignore) {
                return false;
            }
        });
        Assertions.assertThat((int)receiveMessageCount.get()).isEqualTo(maxDeliveryAttempts);
    }

    public void testConsumeOrderly() throws Exception {
        String topic = GrpcBaseIT.initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.FIFO);
        String group = MQRandomUtils.getRandomConsumerGroup();
        SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
        groupConfig.setConsumeMessageOrderly(true);
        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
        this.sendClientSettings(this.stub, this.buildPushConsumerClientSettings(group)).get();
        this.receiveMessage(this.blockingStub, topic, group, 1);
        String messageGroup = "group";
        this.sendClientSettings(this.stub, this.buildProducerClientSettings(topic)).get();
        ArrayList<String> messageIdList = new ArrayList<String>();
        for (int i = 0; i < 3; ++i) {
            String messageId = MessageClientIDSetter.createUniqID();
            messageIdList.add(messageId);
            SendMessageResponse sendResponse = this.blockingStub.sendMessage(this.buildSendOrderMessageRequest(topic, messageId, messageGroup));
            this.assertSendMessage(sendResponse, messageId);
        }
        ArrayList messageRecvList = new ArrayList();
        this.sendClientSettings(this.stub, this.buildPushConsumerClientSettings(group)).get();
        Awaitility.await().atMost(java.time.Duration.ofSeconds(20L)).until(() -> {
            List<Message> retryMessageList = this.getMessageFromReceiveMessageResponse(this.receiveMessage(this.blockingStub, topic, group));
            if (retryMessageList.isEmpty()) {
                return false;
            }
            for (Message message : retryMessageList) {
                String messageId = message.getSystemProperties().getMessageId();
                messageRecvList.add(messageId);
                this.blockingStub.ackMessage(this.buildAckMessageRequest(topic, group, AckMessageEntry.newBuilder().setMessageId(messageId).setReceiptHandle(message.getSystemProperties().getReceiptHandle()).build()));
            }
            return messageRecvList.size() == messageIdList.size();
        });
        for (int i = 0; i < messageIdList.size(); ++i) {
            Assertions.assertThat((String)((String)messageRecvList.get(i))).isEqualTo((String)messageIdList.get(i));
        }
    }

    public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub, String topic, String group) {
        return this.receiveMessage(stub, topic, group, 15);
    }

    public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub, String topic, String group, int timeSeconds) {
        ArrayList<ReceiveMessageResponse> responseList = new ArrayList<ReceiveMessageResponse>();
        Iterator responseIterator = ((MessagingServiceGrpc.MessagingServiceBlockingStub)stub.withDeadlineAfter((long)timeSeconds, TimeUnit.SECONDS)).receiveMessage(this.buildReceiveMessageRequest(topic, group));
        while (responseIterator.hasNext()) {
            responseList.add((ReceiveMessageResponse)responseIterator.next());
        }
        return responseList;
    }

    public List<Message> getMessageFromReceiveMessageResponse(List<ReceiveMessageResponse> responseList) {
        ArrayList<Message> messageList = new ArrayList<Message>();
        for (ReceiveMessageResponse response : responseList) {
            if (!response.hasMessage()) continue;
            messageList.add(response.getMessage());
        }
        return messageList;
    }

    public QueryRouteRequest buildQueryRouteRequest(String topic) {
        return QueryRouteRequest.newBuilder().setEndpoints(this.buildEndpoints(this.port)).setTopic(Resource.newBuilder().setName(topic).build()).build();
    }

    public QueryAssignmentRequest buildQueryAssignmentRequest(String topic, String group) {
        return QueryAssignmentRequest.newBuilder().setEndpoints(this.buildEndpoints(this.port)).setTopic(Resource.newBuilder().setName(topic).build()).setGroup(Resource.newBuilder().setName(group).build()).build();
    }

    public SendMessageRequest buildSendMessageRequest(String topic, String messageId) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(messageId).setQueueId(0).setMessageType(MessageType.NORMAL).setBornTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis())).setBornHost(StringUtils.defaultString((String)NetworkUtil.getLocalAddress(), (String)"127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8((String)"123")).build()).build();
    }

    public SendMessageRequest buildSendOrderMessageRequest(String topic, String messageId, String messageGroup) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(messageId).setQueueId(0).setMessageType(MessageType.FIFO).setMessageGroup(messageGroup).setBornTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis())).setBornHost(StringUtils.defaultString((String)NetworkUtil.getLocalAddress(), (String)"127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8((String)"123")).build()).build();
    }

    public SendMessageRequest buildSendBigMessageRequest(String topic, String messageId, int messageSize) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(messageId).setQueueId(0).setMessageType(MessageType.NORMAL).setBodyEncoding(Encoding.GZIP).setBornTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis())).setBornHost(StringUtils.defaultString((String)NetworkUtil.getLocalAddress(), (String)"127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8((String)RandomUtils.getStringWithCharacter((int)messageSize))).build()).build();
    }

    public SendMessageRequest buildTransactionSendMessageRequest(String topic, String messageId) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(messageId).setQueueId(0).setMessageType(MessageType.TRANSACTION).setOrphanedTransactionRecoveryDuration(Duration.newBuilder().setSeconds(10L)).setBornTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis())).setBornHost(StringUtils.defaultString((String)NetworkUtil.getLocalAddress(), (String)"127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8((String)"123")).build()).build();
    }

    public ReceiveMessageRequest buildReceiveMessageRequest(String topic, String group) {
        return ReceiveMessageRequest.newBuilder().setGroup(Resource.newBuilder().setName(group).build()).setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setId(-1).build()).setBatchSize(1).setAutoRenew(false).setInvisibleDuration(Duration.newBuilder().setSeconds(3L).build()).build();
    }

    public AckMessageRequest buildAckMessageRequest(String topic, String group, AckMessageEntry ... entry) {
        return AckMessageRequest.newBuilder().setGroup(Resource.newBuilder().setName(group).build()).setTopic(Resource.newBuilder().setName(topic).build()).addAllEntries((Iterable)Arrays.stream(entry).collect(Collectors.toList())).build();
    }

    public EndTransactionRequest buildEndTransactionRequest(String topic, String messageId, String transactionId, TransactionResolution resolution) {
        return EndTransactionRequest.newBuilder().setMessageId(messageId).setTopic(Resource.newBuilder().setName(topic).build()).setTransactionId(transactionId).setResolution(resolution).setSource(TransactionSource.SOURCE_SERVER_CHECK).build();
    }

    public ChangeInvisibleDurationRequest buildChangeInvisibleDurationRequest(String topic, String group, String receiptHandle, int second) {
        return ChangeInvisibleDurationRequest.newBuilder().setTopic(Resource.newBuilder().setName(topic).build()).setGroup(Resource.newBuilder().setName(group).build()).setInvisibleDuration(Durations.fromSeconds((long)second)).setReceiptHandle(receiptHandle).build();
    }

    public void assertQueryRoute(QueryRouteResponse response, int messageQueueSize) {
        Assertions.assertThat((Object)response.getStatus()).isEqualTo((Object)ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat((int)response.getMessageQueuesList().size()).isEqualTo(messageQueueSize);
        Assertions.assertThat((int)response.getMessageQueues(0).getBroker().getEndpoints().getAddresses(0).getPort()).isEqualTo((Object)ConfigurationManager.getProxyConfig().getGrpcServerPort());
    }

    public void assertQueryAssignment(QueryAssignmentResponse response, int assignmentCount) {
        Assertions.assertThat((Object)response.getStatus()).isEqualTo((Object)ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat((int)response.getAssignmentsCount()).isEqualTo(assignmentCount);
        Assertions.assertThat((int)response.getAssignments(0).getMessageQueue().getBroker().getEndpoints().getAddresses(0).getPort()).isEqualTo((Object)ConfigurationManager.getProxyConfig().getGrpcServerPort());
    }

    public void assertSendMessage(SendMessageResponse response, String messageId) {
        Assertions.assertThat((Object)response.getStatus()).isEqualTo((Object)ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat((String)response.getEntries(0).getMessageId()).isEqualTo(messageId);
    }

    public Message assertAndGetReceiveMessage(List<ReceiveMessageResponse> response, String messageId) {
        Assertions.assertThat((boolean)response.get(0).hasStatus()).isTrue();
        Assertions.assertThat((Comparable)response.get(0).getStatus().getCode()).isEqualTo((Object)Code.OK);
        Assertions.assertThat((String)response.get(1).getMessage().getSystemProperties().getMessageId()).isEqualTo(messageId);
        return response.get(1).getMessage();
    }

    public void assertRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand command, String messageId) {
        Assertions.assertThat((String)command.getTransactionId()).isNotBlank();
    }

    public void assertEndTransactionResponse(EndTransactionResponse response) {
        Assertions.assertThat((Comparable)response.getStatus().getCode()).isEqualTo((Object)Code.OK);
    }

    public void assertChangeInvisibleDurationResponse(ChangeInvisibleDurationResponse response, String prevHandle) {
        Assertions.assertThat((Comparable)response.getStatus().getCode()).isEqualTo((Object)Code.OK);
        Assertions.assertThat((String)response.getReceiptHandle()).isNotEqualTo((Object)prevHandle);
    }

    public Endpoints buildEndpoints(int port) {
        return Endpoints.newBuilder().setScheme(AddressScheme.IPv4).addAddresses(Address.newBuilder().setHost("127.0.0.1").setPort(port).build()).build();
    }

    public Settings buildSimpleConsumerClientSettings(String group) {
        return Settings.newBuilder().setClientType(ClientType.SIMPLE_CONSUMER).setRequestTimeout(Durations.fromSeconds((long)3L)).setSubscription(Subscription.newBuilder().setGroup(Resource.newBuilder().setName(group).build()).build()).build();
    }

    public Settings buildPushConsumerClientSettings(String group) {
        return this.buildPushConsumerClientSettings(2, group);
    }

    public Settings buildPushConsumerClientSettings(int maxDeliveryAttempts, String group) {
        return Settings.newBuilder().setClientType(ClientType.PUSH_CONSUMER).setRequestTimeout(Durations.fromSeconds((long)3L)).setBackoffPolicy(RetryPolicy.newBuilder().setMaxAttempts(maxDeliveryAttempts).build()).setSubscription(Subscription.newBuilder().setGroup(Resource.newBuilder().setName(group).build()).build()).build();
    }

    public Settings buildProducerClientSettings(String ... topics) {
        List topicResources = Arrays.stream(topics).map(topic -> Resource.newBuilder().setName(topic).build()).collect(Collectors.toList());
        return Settings.newBuilder().setClientType(ClientType.PRODUCER).setPublishing(Publishing.newBuilder().addAllTopics(topicResources).build()).build();
    }

    protected static class DefaultTelemetryCommandStreamObserver
    implements StreamObserver<TelemetryCommand> {
        protected DefaultTelemetryCommandStreamObserver() {
        }

        public void onNext(TelemetryCommand value) {
        }

        public void onError(Throwable t) {
        }

        public void onCompleted() {
        }
    }
}

