package org.apache.rocketmq.proxy.processor;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;

/* loaded from: input_file:org/apache/rocketmq/proxy/processor/ProducerProcessor.class */
public class ProducerProcessor extends AbstractProcessor {
    private static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    private final ExecutorService executor;
    private final TopicMessageTypeValidator topicMessageTypeValidator;

    public ProducerProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager, ExecutorService executorService) {
        super(messagingProcessor, serviceManager);
        this.executor = executorService;
        this.topicMessageTypeValidator = new DefaultTopicMessageTypeValidator();
    }

    public CompletableFuture<List<SendResult>> sendMessage(ProxyContext proxyContext, QueueSelector queueSelector, String str, int i, List<Message> list, long j) {
        AddressableMessageQueue select;
        CompletableFuture completableFuture = new CompletableFuture();
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Message message = list.get(0);
            String topic = message.getTopic();
            if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() && this.topicMessageTypeValidator != null && !NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
                this.topicMessageTypeValidator.validate(this.serviceManager.getMetadataService().getTopicMessageType(proxyContext, topic), TopicMessageType.parseFromMessageProperty(message.getProperties()));
            }
            select = queueSelector.select(proxyContext, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(proxyContext, topic));
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        if (select == null) {
            throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
        }
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            MessageClientIDSetter.setUniqID(it.next());
        }
        SendMessageRequestHeader buildSendMessageRequestHeader = buildSendMessageRequestHeader(list, str, i, select.getQueueId());
        completableFuture = this.serviceManager.getMessageService().sendMessage(proxyContext, select, list, buildSendMessageRequestHeader, j).thenApplyAsync(list2 -> {
            Iterator it2 = list2.iterator();
            while (it2.hasNext()) {
                SendResult sendResult = (SendResult) it2.next();
                int transactionValue = MessageSysFlag.getTransactionValue(buildSendMessageRequestHeader.getSysFlag().intValue());
                if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) && transactionValue == 4 && StringUtils.isNotBlank(sendResult.getTransactionId())) {
                    fillTransactionData(proxyContext, str, select, sendResult, list);
                }
            }
            return list2;
        }, (Executor) this.executor).whenComplete((BiConsumer<? super U, ? super Throwable>) (list3, th2) -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (th2 != null) {
                this.serviceManager.getTopicRouteService().updateFaultItem(select.getBrokerName(), currentTimeMillis2 - currentTimeMillis, true, false);
            } else {
                this.serviceManager.getTopicRouteService().updateFaultItem(select.getBrokerName(), currentTimeMillis2 - currentTimeMillis, false, true);
            }
        });
        return FutureUtils.addExecutor(completableFuture, this.executor);
    }

    protected void fillTransactionData(ProxyContext proxyContext, String str, AddressableMessageQueue addressableMessageQueue, SendResult sendResult, List<Message> list) {
        try {
            this.serviceManager.getTransactionService().addTransactionDataByBrokerName(proxyContext, addressableMessageQueue.getBrokerName(), str, sendResult.getQueueOffset(), (sendResult.getOffsetMsgId() != null ? MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()) : MessageDecoder.decodeMessageId(sendResult.getMsgId())).getOffset(), sendResult.getTransactionId(), list.get(0));
        } catch (Throwable th) {
            log.warn("fillTransactionData failed. messageQueue: {}, sendResult: {}", new Object[]{addressableMessageQueue, sendResult, th});
        }
    }

    protected SendMessageRequestHeader buildSendMessageRequestHeader(List<Message> list, String str, int i, int i2) {
        SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader();
        Message message = list.get(0);
        sendMessageRequestHeader.setProducerGroup(str);
        sendMessageRequestHeader.setTopic(message.getTopic());
        sendMessageRequestHeader.setDefaultTopic("TBW102");
        sendMessageRequestHeader.setDefaultTopicQueueNums(4);
        sendMessageRequestHeader.setQueueId(Integer.valueOf(i2));
        sendMessageRequestHeader.setSysFlag(Integer.valueOf(i));
        try {
            sendMessageRequestHeader.setBornTimestamp(Long.valueOf(Long.parseLong(message.getProperty("BORN_TIMESTAMP"))));
        } catch (Exception e) {
            log.warn("parse born time error, with value:{}", message.getProperty("BORN_TIMESTAMP"));
            sendMessageRequestHeader.setBornTimestamp(Long.valueOf(System.currentTimeMillis()));
        }
        sendMessageRequestHeader.setFlag(Integer.valueOf(message.getFlag()));
        sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
        sendMessageRequestHeader.setReconsumeTimes(0);
        if (list.size() > 1) {
            sendMessageRequestHeader.setBatch(true);
        }
        if (sendMessageRequestHeader.getTopic().startsWith("%RETRY%")) {
            String reconsumeTime = MessageAccessor.getReconsumeTime(message);
            if (reconsumeTime != null) {
                sendMessageRequestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTime));
                MessageAccessor.clearProperty(message, "RECONSUME_TIME");
            }
            String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(message);
            if (maxReconsumeTimes != null) {
                sendMessageRequestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                MessageAccessor.clearProperty(message, "MAX_RECONSUME_TIMES");
            }
        }
        return sendMessageRequestHeader;
    }

    public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, String str2, String str3, long j) {
        CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
        try {
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        if (receiptHandle.getCommitLogOffset() < 0) {
            throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "commit log offset is empty");
        }
        ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = new ConsumerSendMsgBackRequestHeader();
        consumerSendMsgBackRequestHeader.setOffset(Long.valueOf(receiptHandle.getCommitLogOffset()));
        consumerSendMsgBackRequestHeader.setGroup(str2);
        consumerSendMsgBackRequestHeader.setDelayLevel(-1);
        consumerSendMsgBackRequestHeader.setOriginMsgId(str);
        consumerSendMsgBackRequestHeader.setOriginTopic(receiptHandle.getRealTopic(str3, str2));
        consumerSendMsgBackRequestHeader.setMaxReconsumeTimes(0);
        completableFuture = this.serviceManager.getMessageService().sendMessageBack(proxyContext, receiptHandle, str, consumerSendMsgBackRequestHeader, j).whenCompleteAsync((remotingCommand, th2) -> {
            if (th2 == null && remotingCommand.getCode() == 0) {
                this.messagingProcessor.ackMessage(proxyContext, receiptHandle, str, str2, str3, j);
            }
        }, (Executor) this.executor);
        return FutureUtils.addExecutor(completableFuture, this.executor);
    }
}
