package net.dreamlu.iot.mqtt.core.server.support;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttConnectPayload;
import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.codec.MqttConnectVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttFixedHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttMessageFactory;
import net.dreamlu.iot.mqtt.codec.MqttMessageIdVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription;
import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;

/* loaded from: input_file:net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.class */
public class DefaultMqttServerProcessor implements MqttServerProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 120000;
    private static final long KEEP_ALIVE_UNIT = 2000;
    private final long heartbeatTimeout;
    private final IMqttMessageStore messageStore;
    private final IMqttSessionManager sessionManager;
    private final IMqttServerAuthHandler authHandler;
    private final IMqttServerUniqueIdService uniqueIdService;
    private final IMqttServerSubscribeValidator subscribeValidator;
    private final IMqttServerPublishPermission publishPermission;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttConnectStatusListener connectStatusListener;
    private final IMqttMessageListener messageListener;
    private final String nodeName;
    private final ScheduledThreadPoolExecutor executor;

    /* renamed from: net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor$1, reason: invalid class name */
    /* loaded from: input_file:net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS[MqttQoS.FAILURE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public DefaultMqttServerProcessor(MqttServerCreator mqttServerCreator, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
        this.heartbeatTimeout = mqttServerCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : mqttServerCreator.getHeartbeatTimeout().longValue();
        this.messageStore = mqttServerCreator.getMessageStore();
        this.sessionManager = mqttServerCreator.getSessionManager();
        this.authHandler = mqttServerCreator.getAuthHandler();
        this.uniqueIdService = mqttServerCreator.getUniqueIdService();
        this.subscribeValidator = mqttServerCreator.getSubscribeValidator();
        this.publishPermission = mqttServerCreator.getPublishPermission();
        this.messageDispatcher = mqttServerCreator.getMessageDispatcher();
        this.connectStatusListener = mqttServerCreator.getConnectStatusListener();
        this.messageListener = mqttServerCreator.getMessageListener();
        this.nodeName = mqttServerCreator.getNodeName();
        this.executor = scheduledThreadPoolExecutor;
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processConnect(ChannelContext channelContext, MqttConnectMessage mqttConnectMessage) {
        MqttConnectPayload payload = mqttConnectMessage.payload();
        String clientIdentifier = payload.clientIdentifier();
        String userName = payload.userName();
        String password = payload.password();
        String uniqueId = this.uniqueIdService.getUniqueId(channelContext, clientIdentifier, userName, password);
        if (StrUtil.isBlank(uniqueId)) {
            connAckByReturnCode(clientIdentifier, uniqueId, channelContext, MqttConnectReasonCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
            return;
        }
        if (!this.authHandler.verifyAuthenticate(channelContext, uniqueId, clientIdentifier, userName, password)) {
            connAckByReturnCode(clientIdentifier, uniqueId, channelContext, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            return;
        }
        ChannelContext byBsId = Tio.getByBsId(channelContext.getTioConfig(), uniqueId);
        if (byBsId != null) {
            Tio.unbindBsId(byBsId);
            Tio.remove(byBsId, String.format("uniqueId:[%s] clientId:[%s] now bind on new context id:[%s]", uniqueId, clientIdentifier, channelContext.getId()));
            cleanSession(uniqueId);
        }
        sendConnected(channelContext, uniqueId);
        Tio.bindBsId(channelContext, uniqueId);
        channelContext.set(MqttConst.USER_NAME_KEY, userName);
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.variableHeader();
        int keepAliveTimeSeconds = variableHeader.keepAliveTimeSeconds();
        if (keepAliveTimeSeconds > 0 && this.heartbeatTimeout != keepAliveTimeSeconds * KEEP_ALIVE_UNIT) {
            channelContext.setHeartbeatTimeout(Long.valueOf(keepAliveTimeSeconds * KEEP_ALIVE_UNIT));
        }
        if (variableHeader.isWillFlag()) {
            Message message = new Message();
            message.setMessageType(MessageType.DOWN_STREAM);
            message.setFromClientId(uniqueId);
            message.setFromUsername(userName);
            message.setTopic(payload.willTopic());
            byte[] willMessageInBytes = payload.willMessageInBytes();
            if (willMessageInBytes != null) {
                message.setPayload(ByteBuffer.wrap(willMessageInBytes));
            }
            message.setQos(variableHeader.willQos());
            message.setRetain(variableHeader.isWillRetain());
            message.setTimestamp(System.currentTimeMillis());
            Node clientNode = channelContext.getClientNode();
            message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
            message.setNode(this.nodeName);
            this.messageStore.addWillMessage(uniqueId, message);
        }
        connAckByReturnCode(clientIdentifier, uniqueId, channelContext, MqttConnectReasonCode.CONNECTION_ACCEPTED);
        this.executor.execute(() -> {
            try {
                this.connectStatusListener.online(channelContext, uniqueId);
            } catch (Throwable th) {
                logger.error("Mqtt server uniqueId:{} clientId:{} online notify error.", new Object[]{uniqueId, clientIdentifier, th});
            }
        });
    }

    private static void connAckByReturnCode(String str, String str2, ChannelContext channelContext, MqttConnectReasonCode mqttConnectReasonCode) {
        Tio.send(channelContext, MqttMessageBuilders.connAck().returnCode(mqttConnectReasonCode).sessionPresent(false).build());
        if (MqttConnectReasonCode.CONNECTION_ACCEPTED == mqttConnectReasonCode) {
            logger.info("Connect successful, clientId: {} uniqueId:{}", str, str2);
        } else {
            logger.error("Connect error - clientId: {} uniqueId:{} returnCode:{}", new Object[]{str, str2, mqttConnectReasonCode});
        }
    }

    private void sendConnected(ChannelContext channelContext, String str) {
        Message message = new Message();
        message.setClientId(str);
        message.setMessageType(MessageType.CONNECT);
        message.setNode(this.nodeName);
        message.setTimestamp(System.currentTimeMillis());
        Node clientNode = channelContext.getClientNode();
        message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
        this.messageDispatcher.send(message);
    }

    private void cleanSession(String str) {
        try {
            this.sessionManager.remove(str);
        } catch (Throwable th) {
            logger.error("Mqtt server clientId:{} session clean error.", str, th);
        }
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPublish(ChannelContext channelContext, MqttPublishMessage mqttPublishMessage) {
        String bsId = channelContext.getBsId();
        MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
        MqttQoS qosLevel = fixedHeader.qosLevel();
        MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
        String str = variableHeader.topicName();
        if (this.publishPermission != null && !this.publishPermission.verifyPermission(channelContext, bsId, str, qosLevel, fixedHeader.isRetain())) {
            logger.error("Mqtt clientId:{} topic:{} no publish permission.", bsId, str);
            return;
        }
        int packetId = variableHeader.packetId();
        logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", new Object[]{bsId, str, qosLevel, Integer.valueOf(packetId)});
        switch (AnonymousClass1.$SwitchMap$net$dreamlu$iot$mqtt$codec$MqttQoS[qosLevel.ordinal()]) {
            case 1:
                invokeListenerForPublish(channelContext, bsId, qosLevel, str, mqttPublishMessage);
                return;
            case 2:
                invokeListenerForPublish(channelContext, bsId, qosLevel, str, mqttPublishMessage);
                if (packetId != -1) {
                    logger.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{bsId, str, qosLevel, Integer.valueOf(packetId), Tio.send(channelContext, MqttMessageBuilders.pubAck().packetId(packetId).build())});
                    return;
                }
                return;
            case 3:
                if (packetId != -1) {
                    MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(packetId));
                    MqttPendingQos2Publish mqttPendingQos2Publish = new MqttPendingQos2Publish(mqttPublishMessage, mqttMessage);
                    logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{bsId, str, qosLevel, Integer.valueOf(packetId), Tio.send(channelContext, mqttMessage)});
                    this.sessionManager.addPendingQos2Publish(bsId, packetId, mqttPendingQos2Publish);
                    mqttPendingQos2Publish.startPubRecRetransmitTimer(this.executor, mqttMessage2 -> {
                        Tio.send(channelContext, mqttMessage2);
                    });
                    return;
                }
                return;
            case 4:
            default:
                return;
        }
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPubAck(ChannelContext channelContext, MqttMessageIdVariableHeader mqttMessageIdVariableHeader) {
        int messageId = mqttMessageIdVariableHeader.messageId();
        String bsId = channelContext.getBsId();
        logger.debug("PubAck - clientId:{}, messageId:{}", bsId, Integer.valueOf(messageId));
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(bsId, messageId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        this.sessionManager.removePendingPublish(bsId, messageId);
        pendingPublish.getPayload().clear();
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPubRec(ChannelContext channelContext, MqttMessageIdVariableHeader mqttMessageIdVariableHeader) {
        String bsId = channelContext.getBsId();
        int messageId = mqttMessageIdVariableHeader.messageId();
        logger.debug("PubRec - clientId:{}, messageId:{}", bsId, Integer.valueOf(messageId));
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(bsId, messageId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0), mqttMessageIdVariableHeader);
        Tio.send(channelContext, mqttMessage);
        pendingPublish.setPubRelMessage(mqttMessage);
        pendingPublish.startPubRelRetransmissionTimer(this.executor, mqttMessage2 -> {
            Tio.send(channelContext, mqttMessage2);
        });
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPubRel(ChannelContext channelContext, MqttMessageIdVariableHeader mqttMessageIdVariableHeader) {
        String bsId = channelContext.getBsId();
        int messageId = mqttMessageIdVariableHeader.messageId();
        logger.debug("PubRel - clientId:{}, messageId:{}", bsId, Integer.valueOf(messageId));
        MqttPendingQos2Publish pendingQos2Publish = this.sessionManager.getPendingQos2Publish(bsId, messageId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            invokeListenerForPublish(channelContext, bsId, incomingPublish.fixedHeader().qosLevel(), incomingPublish.variableHeader().topicName(), incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.sessionManager.removePendingQos2Publish(bsId, messageId);
        }
        Tio.send(channelContext, MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId), (Object) null));
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPubComp(ChannelContext channelContext, MqttMessageIdVariableHeader mqttMessageIdVariableHeader) {
        int messageId = mqttMessageIdVariableHeader.messageId();
        String bsId = channelContext.getBsId();
        logger.debug("PubComp - clientId:{}, messageId:{}", bsId, Integer.valueOf(messageId));
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(bsId, messageId);
        if (pendingPublish != null) {
            pendingPublish.getPayload().clear();
            pendingPublish.onPubCompReceived();
            this.sessionManager.removePendingPublish(bsId, messageId);
        }
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processSubscribe(ChannelContext channelContext, MqttSubscribeMessage mqttSubscribeMessage) {
        String bsId = channelContext.getBsId();
        int messageId = mqttSubscribeMessage.variableHeader().messageId();
        List<MqttTopicSubscription> list = mqttSubscribeMessage.payload().topicSubscriptions();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = this.subscribeValidator != null;
        for (MqttTopicSubscription mqttTopicSubscription : list) {
            String str = mqttTopicSubscription.topicName();
            MqttQoS qualityOfService = mqttTopicSubscription.qualityOfService();
            if (!z || this.subscribeValidator.verifyTopicFilter(channelContext, bsId, str, qualityOfService)) {
                arrayList.add(qualityOfService);
                arrayList2.add(str);
                this.sessionManager.addSubscribe(str, bsId, qualityOfService.value());
                logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} messageId:{}", new Object[]{bsId, str, qualityOfService, Integer.valueOf(messageId)});
            } else {
                arrayList.add(MqttQoS.FAILURE);
                logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", new Object[]{bsId, str, qualityOfService, Integer.valueOf(messageId)});
            }
        }
        Tio.send(channelContext, MqttMessageBuilders.subAck().addGrantedQosList(arrayList).packetId(messageId).build());
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            List<Message> retainMessage = this.messageStore.getRetainMessage((String) it.next());
            if (retainMessage != null && !retainMessage.isEmpty()) {
                Iterator<Message> it2 = retainMessage.iterator();
                while (it2.hasNext()) {
                    this.messageDispatcher.send(bsId, it2.next());
                }
            }
        }
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processUnSubscribe(ChannelContext channelContext, MqttUnsubscribeMessage mqttUnsubscribeMessage) {
        String bsId = channelContext.getBsId();
        int messageId = mqttUnsubscribeMessage.variableHeader().messageId();
        List list = mqttUnsubscribeMessage.payload().topics();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            this.sessionManager.removeSubscribe((String) it.next(), bsId);
        }
        logger.info("UnSubscribe - clientId:{} Topic:{} messageId:{}", new Object[]{bsId, list, Integer.valueOf(messageId)});
        Tio.send(channelContext, MqttMessageBuilders.unsubAck().packetId(messageId).build());
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processPingReq(ChannelContext channelContext) {
        logger.debug("PingReq - clientId:{}", channelContext.getBsId());
        Tio.send(channelContext, MqttMessage.PINGRESP);
    }

    @Override // net.dreamlu.iot.mqtt.core.server.MqttServerProcessor
    public void processDisConnect(ChannelContext channelContext) {
        logger.info("DisConnect - clientId:{} contextId:{}", channelContext.getBsId(), channelContext.getId());
        channelContext.set(MqttConst.DIS_CONNECTED, (byte) 1);
        Tio.remove(channelContext, "Mqtt DisConnect");
    }

    private void invokeListenerForPublish(ChannelContext channelContext, String str, MqttQoS mqttQoS, String str2, MqttPublishMessage mqttPublishMessage) {
        MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
        boolean isRetain = fixedHeader.isRetain();
        ByteBuffer payload = mqttPublishMessage.payload();
        if (isRetain) {
            if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload == null || payload.array().length == 0) {
                this.messageStore.clearRetainMessage(str2);
            } else {
                Message message = new Message();
                message.setTopic(str2);
                message.setQos(mqttQoS.value());
                message.setPayload(payload);
                message.setFromClientId(str);
                message.setMessageType(MessageType.DOWN_STREAM);
                message.setRetain(true);
                message.setDup(fixedHeader.isDup());
                message.setTimestamp(System.currentTimeMillis());
                Node clientNode = channelContext.getClientNode();
                message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
                message.setNode(this.nodeName);
                this.messageStore.addRetainMessage(str2, message);
            }
        }
        int packetId = mqttPublishMessage.variableHeader().packetId();
        Message message2 = new Message();
        message2.setId(Integer.valueOf(packetId));
        message2.setFromClientId(str);
        message2.setTopic(str2);
        message2.setQos(mqttQoS.value());
        if (payload != null) {
            message2.setPayload(payload);
        }
        message2.setMessageType(MessageType.UP_STREAM);
        message2.setRetain(isRetain);
        message2.setDup(fixedHeader.isDup());
        message2.setTimestamp(System.currentTimeMillis());
        Node clientNode2 = channelContext.getClientNode();
        message2.setPeerHost(clientNode2.getIp() + ':' + clientNode2.getPort());
        message2.setNode(this.nodeName);
        try {
            this.messageListener.onMessage(channelContext, str, message2);
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }
}
