package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.class */
public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage> {
    private static Logger logger = LoggerFactory.getLogger(MqttConnectHandler.class);

    @Resource
    private ChannelManager channelManager;

    @Resource
    private SessionLoop sessionLoop;
    private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("check_connect_future"));

    @Override // org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler
    public boolean preHandler(ChannelHandlerContext channelHandlerContext, MqttConnectMessage mqttConnectMessage) {
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.variableHeader();
        Channel channel = channelHandlerContext.channel();
        ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
        ChannelInfo.setClientId(channel, mqttConnectMessage.payload().clientIdentifier());
        ChannelInfo.setCleanSessionFlag(channel, Boolean.valueOf(variableHeader.isCleanSession()));
        return true;
    }

    @Override // org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler
    public void doHandler(ChannelHandlerContext channelHandlerContext, MqttConnectMessage mqttConnectMessage, HookResult hookResult) {
        Channel channel = channelHandlerContext.channel();
        String remark = hookResult.getRemark();
        if (!hookResult.isSuccess()) {
            channel.writeAndFlush(getMqttConnAckMessage(MqttConnectReturnCode.valueOf((byte) hookResult.getSubCode())));
            this.channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
            return;
        }
        CompletableFuture completableFuture = new CompletableFuture();
        ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, completableFuture);
        this.scheduler.schedule(() -> {
            if (completableFuture.isDone()) {
                return;
            }
            completableFuture.complete(null);
        }, 1L, TimeUnit.SECONDS);
        try {
            MqttConnAckMessage mqttConnAckMessage = getMqttConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
            completableFuture.thenAccept(r5 -> {
                if (channel.isActive()) {
                    ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);
                    channel.writeAndFlush(mqttConnAckMessage);
                }
            });
            this.sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);
        } catch (Exception e) {
            logger.error("Connect:{}", mqttConnectMessage.payload().clientIdentifier(), e);
            this.channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");
        }
    }

    private MqttConnAckMessage getMqttConnAckMessage(MqttConnectReturnCode mqttConnectReturnCode) {
        return new MqttConnAckMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(mqttConnectReturnCode, false));
    }
}
