package org.apache.rocketmq.proxy.grpc.v2.consumer;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.FilterExpression;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.Settings;
import com.google.protobuf.util.Durations;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.QueueSelector;
import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueSelector;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;

/* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.class */
public class ReceiveMessageActivity extends AbstractMessingActivity {
    protected ReceiptHandleProcessor receiptHandleProcessor;
    private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";

    /* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity$ReceiveMessageQueueSelector.class */
    protected static class ReceiveMessageQueueSelector implements QueueSelector {
        private final String brokerName;

        public ReceiveMessageQueueSelector(String str) {
            this.brokerName = str;
        }

        @Override // org.apache.rocketmq.proxy.processor.QueueSelector
        public AddressableMessageQueue select(ProxyContext proxyContext, MessageQueueView messageQueueView) {
            try {
                AddressableMessageQueue addressableMessageQueue = null;
                MessageQueueSelector readSelector = messageQueueView.getReadSelector();
                if (StringUtils.isNotBlank(this.brokerName)) {
                    addressableMessageQueue = readSelector.getQueueByBrokerName(this.brokerName);
                }
                if (addressableMessageQueue == null) {
                    addressableMessageQueue = readSelector.selectOne(true);
                }
                return addressableMessageQueue;
            } catch (Throwable th) {
                return null;
            }
        }
    }

    public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
        super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
        this.receiptHandleProcessor = receiptHandleProcessor;
    }

    public void receiveMessage(ProxyContext proxyContext, ReceiveMessageRequest receiveMessageRequest, StreamObserver<ReceiveMessageResponse> streamObserver) {
        ReceiveMessageResponseStreamWriter createWriter = createWriter(proxyContext, streamObserver);
        try {
            Settings clientSettings = this.grpcClientSettingsManager.getClientSettings(proxyContext);
            boolean fifo = clientSettings.getSubscription().getFifo();
            int maxAttempts = clientSettings.getBackoffPolicy().getMaxAttempts();
            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
            Long remainingMs = proxyContext.getRemainingMs();
            long millis = receiveMessageRequest.hasLongPollingTimeout() ? Durations.toMillis(receiveMessageRequest.getLongPollingTimeout()) : remainingMs.longValue() - (Durations.toMillis(clientSettings.getRequestTimeout()) / 2);
            if (millis < proxyConfig.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
                millis = proxyConfig.getGrpcClientConsumerMinLongPollingTimeoutMillis();
            }
            if (millis > proxyConfig.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
                millis = proxyConfig.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
            }
            if (millis > remainingMs.longValue()) {
                if (remainingMs.longValue() < proxyConfig.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
                    String clientVersion = proxyContext.getClientVersion();
                    createWriter.writeAndComplete(proxyContext, (null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0) ? Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME, "The deadline time remaining is not enough for polling, please check network condition");
                    return;
                }
                millis = remainingMs.longValue();
            }
            validateTopicAndConsumerGroup(receiveMessageRequest.getMessageQueue().getTopic(), receiveMessageRequest.getGroup());
            String wrapResourceWithNamespace = GrpcConverter.getInstance().wrapResourceWithNamespace(receiveMessageRequest.getMessageQueue().getTopic());
            String wrapResourceWithNamespace2 = GrpcConverter.getInstance().wrapResourceWithNamespace(receiveMessageRequest.getGroup());
            long millis2 = Durations.toMillis(receiveMessageRequest.getInvisibleDuration());
            ProxyConfig proxyConfig2 = ConfigurationManager.getProxyConfig();
            if (proxyConfig2.isEnableProxyAutoRenew() && receiveMessageRequest.getAutoRenew()) {
                millis2 = proxyConfig2.getDefaultInvisibleTimeMills();
            } else {
                validateInvisibleTime(millis2, ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
            }
            FilterExpression filterExpression = receiveMessageRequest.getFilterExpression();
            try {
                this.messagingProcessor.popMessage(proxyContext, new ReceiveMessageQueueSelector(receiveMessageRequest.getMessageQueue().getBroker().getName()), wrapResourceWithNamespace2, wrapResourceWithNamespace, receiveMessageRequest.getBatchSize(), millis2, millis, 1, FilterAPI.build(wrapResourceWithNamespace, filterExpression.getExpression(), GrpcConverter.getInstance().buildExpressionType(filterExpression.getType())), fifo, new PopMessageResultFilterImpl(maxAttempts), remainingMs.longValue()).thenAccept(popResult -> {
                    if (proxyConfig2.isEnableProxyAutoRenew() && receiveMessageRequest.getAutoRenew() && PopStatus.FOUND.equals(popResult.getPopStatus())) {
                        for (MessageExt messageExt : popResult.getMsgFoundList()) {
                            String property = messageExt.getProperty("POP_CK");
                            if (property != null) {
                                this.receiptHandleProcessor.addReceiptHandle(this.grpcChannelManager.getChannel(proxyContext.getClientID()), wrapResourceWithNamespace2, messageExt.getMsgId(), property, new MessageReceiptHandle(wrapResourceWithNamespace2, wrapResourceWithNamespace, messageExt.getQueueId(), property, messageExt.getMsgId(), messageExt.getQueueOffset(), messageExt.getReconsumeTimes()));
                            }
                        }
                    }
                    createWriter.writeAndComplete(proxyContext, receiveMessageRequest, popResult);
                }).exceptionally(th -> {
                    createWriter.writeAndComplete(proxyContext, receiveMessageRequest, th);
                    return null;
                });
            } catch (Exception e) {
                createWriter.writeAndComplete(proxyContext, Code.ILLEGAL_FILTER_EXPRESSION, e.getMessage());
            }
        } catch (Throwable th2) {
            createWriter.writeAndComplete(proxyContext, receiveMessageRequest, th2);
        }
    }

    protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext proxyContext, StreamObserver<ReceiveMessageResponse> streamObserver) {
        return new ReceiveMessageResponseStreamWriter(this.messagingProcessor, streamObserver);
    }
}
