package org.apache.rocketmq.mqtt.ds.upstream.processor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.meta.WildcardManager;
import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.class */
public class PublishProcessor implements UpstreamProcessor {

    @Resource
    private LmqQueueStore lmqQueueStore;

    @Resource
    private WildcardManager wildcardManager;

    @Resource
    private FirstTopicManager firstTopicManager;

    @Override // org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor
    public CompletableFuture<HookResult> process(MqttMessageUpContext mqttMessageUpContext, MqttMessage mqttMessage) {
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
        String createUniqID = MessageClientIDSetter.createUniqID();
        String normalizeTopic = TopicUtils.normalizeTopic(mqttPublishMessage.variableHeader().topicName());
        this.firstTopicManager.checkFirstTopicIfCreated(TopicUtils.decode(normalizeTopic).getFirstTopic());
        Set<String> matchQueueSetByMsgTopic = this.wildcardManager.matchQueueSetByMsgTopic(normalizeTopic, mqttMessageUpContext.getNamespace());
        Message message = MessageUtil.toMessage(mqttPublishMessage);
        message.setMsgId(createUniqID);
        message.setBornTimestamp(System.currentTimeMillis());
        return this.lmqQueueStore.putMessage(matchQueueSetByMsgTopic, message).thenCompose(storeResult -> {
            return HookResult.newHookResult(200, (String) null, JSON.toJSONBytes(storeResult, new SerializerFeature[0]));
        });
    }
}
