package org.apache.rocketmq.test.grpc.v2;

import apache.rocketmq.v2.AckMessageEntry;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.AckMessageResultEntry;
import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Encoding;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.MessagingServiceGrpc;
import apache.rocketmq.v2.Publishing;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.RetryPolicy;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SystemProperties;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.TransactionResolution;
import apache.rocketmq.v2.TransactionSource;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.net.URL;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.RandomUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;

/* loaded from: input_file:org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.class */
public class GrpcBaseIT extends BaseConf {
    protected MessagingServiceGrpc.MessagingServiceBlockingStub blockingStub;
    protected MessagingServiceGrpc.MessagingServiceStub stub;
    protected static final int DEFAULT_QUEUE_NUMS = 8;
    private int port = 0;

    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
    protected final Metadata header = new Metadata();

    /* loaded from: input_file:org/apache/rocketmq/test/grpc/v2/GrpcBaseIT$DefaultTelemetryCommandStreamObserver.class */
    protected static class DefaultTelemetryCommandStreamObserver implements StreamObserver<TelemetryCommand> {
        protected DefaultTelemetryCommandStreamObserver() {
        }

        @Override // 
        public void onNext(TelemetryCommand telemetryCommand) {
        }

        public void onError(Throwable th) {
        }

        public void onCompleted() {
        }
    }

    public void setUp() throws Exception {
        brokerController1.getBrokerConfig().setTransactionCheckInterval(3000L);
        brokerController2.getBrokerConfig().setTransactionCheckInterval(3000L);
        brokerController3.getBrokerConfig().setTransactionCheckInterval(3000L);
        this.header.put(InterceptorConstants.CLIENT_ID, "client-id" + UUID.randomUUID());
        this.header.put(InterceptorConstants.LANGUAGE, "JAVA");
        URL resource = getClass().getClassLoader().getResource("rmq-proxy-home");
        String path = resource != null ? resource.toURI().getPath() : "/mock/rmq/proxy/home";
        if (null != path) {
            System.setProperty("RMQ_PROXY_HOME", path);
        }
        ConfigurationManager.initEnv();
        ConfigurationManager.intConfig();
        ConfigurationManager.getProxyConfig().setNamesrvAddr(NAMESRV_ADDR);
        ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500L);
        ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
        ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
        ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3L);
        ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0L);
    }

    protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel channel) {
        return MessagingServiceGrpc.newStub(channel).withInterceptors(new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor(this.header)});
    }

    protected MessagingServiceGrpc.MessagingServiceBlockingStub createBlockingStub(Channel channel) {
        return MessagingServiceGrpc.newBlockingStub(channel).withInterceptors(new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor(this.header)});
    }

    protected CompletableFuture<Settings> sendClientSettings(MessagingServiceGrpc.MessagingServiceStub messagingServiceStub, Settings settings) {
        final CompletableFuture<Settings> completableFuture = new CompletableFuture<>();
        StreamObserver telemetry = messagingServiceStub.telemetry(new DefaultTelemetryCommandStreamObserver() { // from class: org.apache.rocketmq.test.grpc.v2.GrpcBaseIT.1
            @Override // org.apache.rocketmq.test.grpc.v2.GrpcBaseIT.DefaultTelemetryCommandStreamObserver
            public void onNext(TelemetryCommand telemetryCommand) {
                if (TelemetryCommand.CommandCase.SETTINGS.equals(telemetryCommand.getCommandCase())) {
                    completableFuture.complete(telemetryCommand.getSettings());
                }
            }
        });
        telemetry.onNext(TelemetryCommand.newBuilder().setSettings(settings).build());
        completableFuture.whenComplete((settings2, th) -> {
            telemetry.onCompleted();
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setUpServer(MessagingServiceGrpc.MessagingServiceImplBase messagingServiceImplBase, int i, boolean z) throws IOException, CertificateException {
        SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
        ServerServiceDefinition intercept = ServerInterceptors.intercept(messagingServiceImplBase, new ServerInterceptor[0]);
        if (z) {
            intercept = ServerInterceptors.intercept(messagingServiceImplBase, new ServerInterceptor[]{new ContextInterceptor(), new HeaderInterceptor()});
        }
        Server start = NettyServerBuilder.forPort(i).directExecutor().addService(intercept).useTransportSecurity(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build().start();
        this.port = start.getPort();
        this.grpcCleanup.register(start);
        ConfigurationManager.getProxyConfig().setGrpcServerPort(Integer.valueOf(this.port));
        this.blockingStub = createBlockingStub(createChannel(ConfigurationManager.getProxyConfig().getGrpcServerPort().intValue()));
        this.stub = createStub(createChannel(ConfigurationManager.getProxyConfig().getGrpcServerPort().intValue()));
    }

    protected Channel createChannel(int i) throws SSLException {
        return this.grpcCleanup.register(NettyChannelBuilder.forAddress("127.0.0.1", i).directExecutor().sslContext(SslContextBuilder.forClient().sslProvider(SslProvider.OPENSSL).trustManager(InsecureTrustManagerFactory.INSTANCE).applicationProtocolConfig(new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, new String[]{"h2"})).build()).build());
    }

    public void testTransactionCheckThenCommit() {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.TRANSACTION);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        final AtomicReference atomicReference = new AtomicReference(null);
        StreamObserver telemetry = this.stub.telemetry(new DefaultTelemetryCommandStreamObserver() { // from class: org.apache.rocketmq.test.grpc.v2.GrpcBaseIT.2
            @Override // org.apache.rocketmq.test.grpc.v2.GrpcBaseIT.DefaultTelemetryCommandStreamObserver
            public void onNext(TelemetryCommand telemetryCommand) {
                atomicReference.set(telemetryCommand);
            }
        });
        try {
            telemetry.onNext(TelemetryCommand.newBuilder().setSettings(buildPushConsumerClientSettings(randomConsumerGroup)).build());
            Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
                if (atomicReference.get() != null && ((TelemetryCommand) atomicReference.get()).getCommandCase() == TelemetryCommand.CommandCase.SETTINGS) {
                    return Boolean.valueOf(atomicReference.get() != null);
                }
                return false;
            });
            atomicReference.set(null);
            receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
            telemetry.onNext(TelemetryCommand.newBuilder().setSettings(buildProducerClientSettings(initTopicOnSampleTopicBroker)).build());
            this.blockingStub.heartbeat(buildHeartbeatRequest(randomConsumerGroup));
            Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
                if (atomicReference.get() == null) {
                    this.blockingStub.heartbeat(buildHeartbeatRequest(randomConsumerGroup));
                    return false;
                }
                if (((TelemetryCommand) atomicReference.get()).getCommandCase() == TelemetryCommand.CommandCase.SETTINGS) {
                    return Boolean.valueOf(atomicReference.get() != null);
                }
                this.blockingStub.heartbeat(buildHeartbeatRequest(randomConsumerGroup));
                return false;
            });
            atomicReference.set(null);
            String createUniqID = MessageClientIDSetter.createUniqID();
            assertSendMessage(this.blockingStub.sendMessage(buildTransactionSendMessageRequest(initTopicOnSampleTopicBroker, createUniqID)), createUniqID);
            Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
                if (atomicReference.get() == null) {
                    this.blockingStub.heartbeat(buildHeartbeatRequest(randomConsumerGroup));
                    return false;
                }
                if (((TelemetryCommand) atomicReference.get()).getCommandCase() == TelemetryCommand.CommandCase.RECOVER_ORPHANED_TRANSACTION_COMMAND) {
                    return Boolean.valueOf(atomicReference.get() != null);
                }
                this.blockingStub.heartbeat(buildHeartbeatRequest(randomConsumerGroup));
                return false;
            });
            RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand = ((TelemetryCommand) atomicReference.get()).getRecoverOrphanedTransactionCommand();
            assertRecoverOrphanedTransactionCommand(recoverOrphanedTransactionCommand, createUniqID);
            assertEndTransactionResponse(this.blockingStub.endTransaction(buildEndTransactionRequest(initTopicOnSampleTopicBroker, createUniqID, recoverOrphanedTransactionCommand.getTransactionId(), TransactionResolution.COMMIT)));
            telemetry.onNext(TelemetryCommand.newBuilder().setSettings(buildPushConsumerClientSettings(randomConsumerGroup)).build());
            Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> {
                List<Message> messageFromReceiveMessageResponse = getMessageFromReceiveMessageResponse(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup));
                if (messageFromReceiveMessageResponse.isEmpty()) {
                    return false;
                }
                return Boolean.valueOf(messageFromReceiveMessageResponse.get(0).getSystemProperties().getMessageId().equals(createUniqID));
            });
            telemetry.onCompleted();
        } catch (Throwable th) {
            telemetry.onCompleted();
            throw th;
        }
    }

    public HeartbeatRequest buildHeartbeatRequest(String str) {
        return HeartbeatRequest.newBuilder().setGroup(Resource.newBuilder().setName(str).build()).build();
    }

    public void testSimpleConsumerSendAndRecvDelayMessage() throws Exception {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.DELAY);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        long millis = TimeUnit.SECONDS.toMillis(5L);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
        sendClientSettings(this.stub, buildProducerClientSettings(initTopicOnSampleTopicBroker)).get();
        String createUniqID = MessageClientIDSetter.createUniqID();
        SendMessageResponse sendMessage = this.blockingStub.sendMessage(SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(initTopicOnSampleTopicBroker).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(createUniqID).setQueueId(0).setMessageType(MessageType.NORMAL).setBodyEncoding(Encoding.GZIP).setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")).setDeliveryTimestamp(Timestamps.fromMillis(System.currentTimeMillis() + millis)).build()).setBody(ByteString.copyFromUtf8("hello")).build()).build());
        long currentTimeMillis = System.currentTimeMillis();
        assertSendMessage(sendMessage, createUniqID);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        AtomicLong atomicLong = new AtomicLong();
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
            List<Message> messageFromReceiveMessageResponse = getMessageFromReceiveMessageResponse(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup));
            if (messageFromReceiveMessageResponse.isEmpty()) {
                return false;
            }
            atomicLong.set(System.currentTimeMillis());
            atomicReference.set(messageFromReceiveMessageResponse.get(0));
            return Boolean.valueOf(messageFromReceiveMessageResponse.get(0).getSystemProperties().getMessageId().equals(createUniqID));
        });
        Assertions.assertThat(Math.abs((atomicLong.get() - currentTimeMillis) - millis) < 2000).isTrue();
    }

    public void testSimpleConsumerSendAndRecvBigMessage() throws Exception {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
        sendClientSettings(this.stub, buildProducerClientSettings(initTopicOnSampleTopicBroker)).get();
        String createUniqID = MessageClientIDSetter.createUniqID();
        assertSendMessage(this.blockingStub.sendMessage(buildSendBigMessageRequest(initTopicOnSampleTopicBroker, createUniqID, 4096)), createUniqID);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        Message assertAndGetReceiveMessage = assertAndGetReceiveMessage(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup), createUniqID);
        Assertions.assertThat(assertAndGetReceiveMessage.getSystemProperties().getBodyEncoding()).isEqualTo(Encoding.GZIP);
        Assertions.assertThat(assertAndGetReceiveMessage.getBody().size()).isEqualTo(4096);
    }

    public void testSimpleConsumerSendAndRecv() throws Exception {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
        sendClientSettings(this.stub, buildProducerClientSettings(initTopicOnSampleTopicBroker)).get();
        String createUniqID = MessageClientIDSetter.createUniqID();
        assertSendMessage(this.blockingStub.sendMessage(buildSendMessageRequest(initTopicOnSampleTopicBroker, createUniqID)), createUniqID);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        String receiptHandle = assertAndGetReceiveMessage(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup), createUniqID).getSystemProperties().getReceiptHandle();
        ChangeInvisibleDurationResponse changeInvisibleDuration = this.blockingStub.changeInvisibleDuration(buildChangeInvisibleDurationRequest(initTopicOnSampleTopicBroker, randomConsumerGroup, receiptHandle, 5));
        assertChangeInvisibleDurationResponse(changeInvisibleDuration, receiptHandle);
        ArrayList arrayList = new ArrayList();
        arrayList.add(changeInvisibleDuration.getReceiptHandle());
        Awaitility.await().atMost(Duration.ofSeconds(20L)).until(() -> {
            List<Message> messageFromReceiveMessageResponse = getMessageFromReceiveMessageResponse(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup));
            if (!messageFromReceiveMessageResponse.isEmpty() && messageFromReceiveMessageResponse.get(0).getSystemProperties().getMessageId().equals(createUniqID)) {
                arrayList.add(messageFromReceiveMessageResponse.get(0).getSystemProperties().getReceiptHandle());
                return true;
            }
            return false;
        });
        Assertions.assertThat(arrayList.size()).isEqualTo(2);
        AckMessageResponse ackMessage = this.blockingStub.ackMessage(buildAckMessageRequest(initTopicOnSampleTopicBroker, randomConsumerGroup, AckMessageEntry.newBuilder().setMessageId(createUniqID).setReceiptHandle((String) arrayList.get(0)).build(), AckMessageEntry.newBuilder().setMessageId(createUniqID).setReceiptHandle((String) arrayList.get(1)).build()));
        Assertions.assertThat(ackMessage.getStatus().getCode()).isEqualTo(Code.MULTIPLE_RESULTS);
        int i = 0;
        int i2 = 0;
        for (AckMessageResultEntry ackMessageResultEntry : ackMessage.getEntriesList()) {
            if (ackMessageResultEntry.getStatus().getCode().equals(Code.OK)) {
                i++;
            } else if (ackMessageResultEntry.getStatus().getCode().equals(Code.INVALID_RECEIPT_HANDLE)) {
                i2++;
            }
        }
        Assertions.assertThat(i).isEqualTo(1);
        Assertions.assertThat(i2).isEqualTo(1);
    }

    public void testSimpleConsumerToDLQ() throws Exception {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        SubscriptionGroupConfig findSubscriptionGroupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(randomConsumerGroup);
        findSubscriptionGroupConfig.setRetryMaxTimes(2 - 1);
        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
        sendClientSettings(this.stub, buildProducerClientSettings(initTopicOnSampleTopicBroker)).get();
        String createUniqID = MessageClientIDSetter.createUniqID();
        assertSendMessage(this.blockingStub.sendMessage(buildSendMessageRequest(initTopicOnSampleTopicBroker, createUniqID)), createUniqID);
        sendClientSettings(this.stub, buildSimpleConsumerClientSettings(randomConsumerGroup)).get();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        assertAndGetReceiveMessage(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup), createUniqID);
        atomicInteger.incrementAndGet();
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(randomConsumerGroup);
        defaultMQPullConsumer.start();
        MessageQueue messageQueue = new MessageQueue(MixAll.getDLQTopic(randomConsumerGroup), BROKER1_NAME, 0);
        Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> {
            try {
                atomicInteger.addAndGet(getMessageFromReceiveMessageResponse(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1)).size());
                PullResult pull = defaultMQPullConsumer.pull(messageQueue, "*", 0L, 1);
                if (PullStatus.FOUND.equals(pull.getPullStatus())) {
                    return Boolean.valueOf(createUniqID.equals(((MessageExt) pull.getMsgFoundList().get(0)).getMsgId()));
                }
                return false;
            } catch (Throwable th) {
                return false;
            }
        });
        Assertions.assertThat(atomicInteger.get()).isEqualTo(2);
    }

    public void testConsumeOrderly() throws Exception {
        String initTopicOnSampleTopicBroker = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.FIFO);
        String randomConsumerGroup = MQRandomUtils.getRandomConsumerGroup();
        SubscriptionGroupConfig findSubscriptionGroupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(randomConsumerGroup);
        findSubscriptionGroupConfig.setConsumeMessageOrderly(true);
        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(findSubscriptionGroupConfig);
        sendClientSettings(this.stub, buildPushConsumerClientSettings(randomConsumerGroup)).get();
        receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup, 1);
        sendClientSettings(this.stub, buildProducerClientSettings(initTopicOnSampleTopicBroker)).get();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            String createUniqID = MessageClientIDSetter.createUniqID();
            arrayList.add(createUniqID);
            assertSendMessage(this.blockingStub.sendMessage(buildSendOrderMessageRequest(initTopicOnSampleTopicBroker, createUniqID, "group")), createUniqID);
        }
        ArrayList arrayList2 = new ArrayList();
        sendClientSettings(this.stub, buildPushConsumerClientSettings(randomConsumerGroup)).get();
        Awaitility.await().atMost(Duration.ofSeconds(20L)).until(() -> {
            List<Message> messageFromReceiveMessageResponse = getMessageFromReceiveMessageResponse(receiveMessage(this.blockingStub, initTopicOnSampleTopicBroker, randomConsumerGroup));
            if (messageFromReceiveMessageResponse.isEmpty()) {
                return false;
            }
            for (Message message : messageFromReceiveMessageResponse) {
                String messageId = message.getSystemProperties().getMessageId();
                arrayList2.add(messageId);
                this.blockingStub.ackMessage(buildAckMessageRequest(initTopicOnSampleTopicBroker, randomConsumerGroup, AckMessageEntry.newBuilder().setMessageId(messageId).setReceiptHandle(message.getSystemProperties().getReceiptHandle()).build()));
            }
            return Boolean.valueOf(arrayList2.size() == arrayList.size());
        });
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assertions.assertThat((String) arrayList2.get(i2)).isEqualTo((String) arrayList.get(i2));
        }
    }

    public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub messagingServiceBlockingStub, String str, String str2) {
        return receiveMessage(messagingServiceBlockingStub, str, str2, 15);
    }

    public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub messagingServiceBlockingStub, String str, String str2, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator receiveMessage = messagingServiceBlockingStub.withDeadlineAfter(i, TimeUnit.SECONDS).receiveMessage(buildReceiveMessageRequest(str, str2));
        while (receiveMessage.hasNext()) {
            arrayList.add(receiveMessage.next());
        }
        return arrayList;
    }

    public List<Message> getMessageFromReceiveMessageResponse(List<ReceiveMessageResponse> list) {
        ArrayList arrayList = new ArrayList();
        for (ReceiveMessageResponse receiveMessageResponse : list) {
            if (receiveMessageResponse.hasMessage()) {
                arrayList.add(receiveMessageResponse.getMessage());
            }
        }
        return arrayList;
    }

    public QueryRouteRequest buildQueryRouteRequest(String str) {
        return QueryRouteRequest.newBuilder().setEndpoints(buildEndpoints(this.port)).setTopic(Resource.newBuilder().setName(str).build()).build();
    }

    public QueryAssignmentRequest buildQueryAssignmentRequest(String str, String str2) {
        return QueryAssignmentRequest.newBuilder().setEndpoints(buildEndpoints(this.port)).setTopic(Resource.newBuilder().setName(str).build()).setGroup(Resource.newBuilder().setName(str2).build()).build();
    }

    public SendMessageRequest buildSendMessageRequest(String str, String str2) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(str2).setQueueId(0).setMessageType(MessageType.NORMAL).setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8("123")).build()).build();
    }

    public SendMessageRequest buildSendOrderMessageRequest(String str, String str2, String str3) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(str2).setQueueId(0).setMessageType(MessageType.FIFO).setMessageGroup(str3).setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8("123")).build()).build();
    }

    public SendMessageRequest buildSendBigMessageRequest(String str, String str2, int i) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(str2).setQueueId(0).setMessageType(MessageType.NORMAL).setBodyEncoding(Encoding.GZIP).setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8(RandomUtils.getStringWithCharacter(i))).build()).build();
    }

    public SendMessageRequest buildTransactionSendMessageRequest(String str, String str2) {
        return SendMessageRequest.newBuilder().addMessages(Message.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(str2).setQueueId(0).setMessageType(MessageType.TRANSACTION).setOrphanedTransactionRecoveryDuration(com.google.protobuf.Duration.newBuilder().setSeconds(10L)).setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8("123")).build()).build();
    }

    public ReceiveMessageRequest buildReceiveMessageRequest(String str, String str2) {
        return ReceiveMessageRequest.newBuilder().setGroup(Resource.newBuilder().setName(str2).build()).setMessageQueue(apache.rocketmq.v2.MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setId(-1).build()).setBatchSize(1).setAutoRenew(false).setInvisibleDuration(com.google.protobuf.Duration.newBuilder().setSeconds(3L).build()).build();
    }

    public AckMessageRequest buildAckMessageRequest(String str, String str2, AckMessageEntry... ackMessageEntryArr) {
        return AckMessageRequest.newBuilder().setGroup(Resource.newBuilder().setName(str2).build()).setTopic(Resource.newBuilder().setName(str).build()).addAllEntries((Iterable) Arrays.stream(ackMessageEntryArr).collect(Collectors.toList())).build();
    }

    public EndTransactionRequest buildEndTransactionRequest(String str, String str2, String str3, TransactionResolution transactionResolution) {
        return EndTransactionRequest.newBuilder().setMessageId(str2).setTopic(Resource.newBuilder().setName(str).build()).setTransactionId(str3).setResolution(transactionResolution).setSource(TransactionSource.SOURCE_SERVER_CHECK).build();
    }

    public ChangeInvisibleDurationRequest buildChangeInvisibleDurationRequest(String str, String str2, String str3, int i) {
        return ChangeInvisibleDurationRequest.newBuilder().setTopic(Resource.newBuilder().setName(str).build()).setGroup(Resource.newBuilder().setName(str2).build()).setInvisibleDuration(Durations.fromSeconds(i)).setReceiptHandle(str3).build();
    }

    public void assertQueryRoute(QueryRouteResponse queryRouteResponse, int i) {
        Assertions.assertThat(queryRouteResponse.getStatus()).isEqualTo(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat(queryRouteResponse.getMessageQueuesList().size()).isEqualTo(i);
        Assertions.assertThat(queryRouteResponse.getMessageQueues(0).getBroker().getEndpoints().getAddresses(0).getPort()).isEqualTo(ConfigurationManager.getProxyConfig().getGrpcServerPort());
    }

    public void assertQueryAssignment(QueryAssignmentResponse queryAssignmentResponse, int i) {
        Assertions.assertThat(queryAssignmentResponse.getStatus()).isEqualTo(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat(queryAssignmentResponse.getAssignmentsCount()).isEqualTo(i);
        Assertions.assertThat(queryAssignmentResponse.getAssignments(0).getMessageQueue().getBroker().getEndpoints().getAddresses(0).getPort()).isEqualTo(ConfigurationManager.getProxyConfig().getGrpcServerPort());
    }

    public void assertSendMessage(SendMessageResponse sendMessageResponse, String str) {
        Assertions.assertThat(sendMessageResponse.getStatus()).isEqualTo(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
        Assertions.assertThat(sendMessageResponse.getEntries(0).getMessageId()).isEqualTo(str);
    }

    public Message assertAndGetReceiveMessage(List<ReceiveMessageResponse> list, String str) {
        Assertions.assertThat(list.get(0).hasStatus()).isTrue();
        Assertions.assertThat(list.get(0).getStatus().getCode()).isEqualTo(Code.OK);
        Assertions.assertThat(list.get(1).getMessage().getSystemProperties().getMessageId()).isEqualTo(str);
        return list.get(1).getMessage();
    }

    public void assertRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand, String str) {
        Assertions.assertThat(recoverOrphanedTransactionCommand.getTransactionId()).isNotBlank();
    }

    public void assertEndTransactionResponse(EndTransactionResponse endTransactionResponse) {
        Assertions.assertThat(endTransactionResponse.getStatus().getCode()).isEqualTo(Code.OK);
    }

    public void assertChangeInvisibleDurationResponse(ChangeInvisibleDurationResponse changeInvisibleDurationResponse, String str) {
        Assertions.assertThat(changeInvisibleDurationResponse.getStatus().getCode()).isEqualTo(Code.OK);
        Assertions.assertThat(changeInvisibleDurationResponse.getReceiptHandle()).isNotEqualTo(str);
    }

    public Endpoints buildEndpoints(int i) {
        return Endpoints.newBuilder().setScheme(AddressScheme.IPv4).addAddresses(Address.newBuilder().setHost("127.0.0.1").setPort(i).build()).build();
    }

    public Settings buildSimpleConsumerClientSettings(String str) {
        return Settings.newBuilder().setClientType(ClientType.SIMPLE_CONSUMER).setRequestTimeout(Durations.fromSeconds(3L)).setSubscription(Subscription.newBuilder().setGroup(Resource.newBuilder().setName(str).build()).build()).build();
    }

    public Settings buildPushConsumerClientSettings(String str) {
        return buildPushConsumerClientSettings(2, str);
    }

    public Settings buildPushConsumerClientSettings(int i, String str) {
        return Settings.newBuilder().setClientType(ClientType.PUSH_CONSUMER).setRequestTimeout(Durations.fromSeconds(3L)).setBackoffPolicy(RetryPolicy.newBuilder().setMaxAttempts(i).build()).setSubscription(Subscription.newBuilder().setGroup(Resource.newBuilder().setName(str).build()).build()).build();
    }

    public Settings buildProducerClientSettings(String... strArr) {
        return Settings.newBuilder().setClientType(ClientType.PRODUCER).setPublishing(Publishing.newBuilder().addAllTopics((List) Arrays.stream(strArr).map(str -> {
            return Resource.newBuilder().setName(str).build();
        }).collect(Collectors.toList())).build()).build();
    }
}
