package org.apache.rocketmq.mqtt.cs.session.infly;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/infly/PushAction.class */
public class PushAction {
    private static Logger logger = LoggerFactory.getLogger(PushAction.class);

    @Resource
    private MqttMsgId mqttMsgId;

    @Resource
    private RetryDriver retryDriver;

    @Resource
    private InFlyCache inFlyCache;

    @Resource
    private ConnectConf connectConf;

    public void messageArrive(Session session, Subscription subscription, Queue queue) {
        Message nextSendMessageByOrder;
        if (session == null) {
            return;
        }
        if (this.connectConf.isOrder()) {
            if (this.retryDriver.needRetryBefore(subscription, queue, session) || (nextSendMessageByOrder = session.nextSendMessageByOrder(subscription, queue)) == null) {
                return;
            }
            push(nextSendMessageByOrder, subscription, session, queue);
            return;
        }
        List<Message> pendMessageList = session.pendMessageList(subscription, queue);
        if (pendMessageList == null || pendMessageList.isEmpty()) {
            return;
        }
        for (Message message : pendMessageList) {
            message.setAck(0);
            push(message, subscription, session, queue);
        }
    }

    public void push(Message message, Subscription subscription, Session session, Queue queue) {
        int nextId = this.mqttMsgId.nextId(session.getClientId());
        this.inFlyCache.getPendingDownCache().put(session.getChannelId(), nextId, subscription, queue, message);
        try {
            if (session.isClean() && message.getStoreTimestamp() > 0 && message.getStoreTimestamp() < session.getStartTime()) {
                logger.warn("old msg:{},{},{},{}", new Object[]{session.getClientId(), message.getMsgId(), Long.valueOf(message.getStoreTimestamp()), Long.valueOf(session.getStartTime())});
                rollNext(session, nextId);
                return;
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        int qos = subscription.getQos();
        if (subscription.isP2p() && message.qos() != null) {
            qos = message.qos().intValue();
        }
        if (qos == 0) {
            write(session, message, nextId, 0, subscription);
            rollNextByAck(session, nextId);
        } else {
            this.retryDriver.mountPublish(nextId, message, subscription.getQos(), ChannelInfo.getId(session.getChannel()), subscription);
            write(session, message, nextId, qos, subscription);
        }
    }

    public void write(Session session, Message message, int i, int i2, Subscription subscription) {
        Channel channel = session.getChannel();
        ChannelInfo.getOwner(channel);
        String clientId = session.getClientId();
        String originTopic = message.getOriginTopic();
        String userProperty = message.getUserProperty(Message.extPropertyMqttRealTopic);
        if (StringUtils.isNotBlank(userProperty)) {
            originTopic = userProperty;
        }
        if (StringUtils.isBlank(originTopic)) {
            originTopic = message.getFirstTopic();
        }
        TopicUtils.isP2P(TopicUtils.decode(originTopic).getSecondTopic());
        if (!channel.isWritable()) {
            logger.error("UnWritable:{}", clientId);
            return;
        }
        ChannelFuture writeAndFlush = session.getChannel().writeAndFlush(MessageUtil.toMqttMessage(originTopic, message.getPayload(), i2, i));
        int length = message.getPayload() != null ? message.getPayload().length : 0;
        writeAndFlush.addListener(channelFuture -> {
            if (subscription.isRetry()) {
                message.setRetry(message.getRetry() + 1);
                logger.warn("retryPush:{},{},{}", new Object[]{session.getClientId(), message.getMsgId(), Integer.valueOf(message.getRetry())});
            }
        });
    }

    public void rollNextByAck(Session session, int i) {
        if (session == null) {
            return;
        }
        this.mqttMsgId.releaseId(i, session.getClientId());
        if (this.inFlyCache.getPendingDownCache().get(session.getChannelId(), i) == null) {
            return;
        }
        rollNext(session, i);
    }

    public void rollNext(Session session, int i) {
        if (session == null) {
            return;
        }
        this.mqttMsgId.releaseId(i, session.getClientId());
        InFlyCache.PendingDown remove = this.inFlyCache.getPendingDownCache().remove(session.getChannelId(), i);
        if (remove == null) {
            return;
        }
        _rollNext(session, remove);
    }

    public void rollNextNoWaitRetry(Session session, int i) {
        InFlyCache.PendingDown pendingDown;
        if (session == null || session.isDestroyed() || (pendingDown = this.inFlyCache.getPendingDownCache().get(session.getChannelId(), i)) == null) {
            return;
        }
        _rollNext(session, pendingDown);
    }

    public void _rollNext(Session session, InFlyCache.PendingDown pendingDown) {
        Subscription subscription = pendingDown.getSubscription();
        Queue queue = pendingDown.getQueue();
        long seqId = pendingDown.getSeqId();
        if (!this.connectConf.isOrder()) {
            session.ack(subscription, queue, seqId);
            return;
        }
        Message rollNext = session.rollNext(subscription, queue, seqId);
        if (rollNext != null) {
            push(rollNext, subscription, session, queue);
        }
    }
}
