package net.dreamlu.iot.mqtt.core.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.dreamlu.iot.mqtt.codec.MqttCodecUtil;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.model.ClientInfo;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.stat.vo.StatVo;
import org.tio.server.TioServer;
import org.tio.server.TioServerConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.page.Page;
import org.tio.utils.page.PageUtils;
import org.tio.utils.timer.TimerTask;
import org.tio.utils.timer.TimerTaskService;

/* loaded from: input_file:net/dreamlu/iot/mqtt/core/server/MqttServer.class */
public final class MqttServer {
    private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
    private static final List<MqttServer> MQTT_SERVERS = new ArrayList();
    private final TioServer tioServer;
    private final MqttWebServer webServer;
    private final MqttServerCreator serverCreator;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageStore messageStore;
    private final ExecutorService mqttExecutor;
    private final TimerTaskService taskService;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttServer(TioServer tioServer, MqttWebServer mqttWebServer, MqttServerCreator mqttServerCreator, TimerTaskService timerTaskService) {
        this.tioServer = tioServer;
        this.webServer = mqttWebServer;
        this.serverCreator = mqttServerCreator;
        this.sessionManager = mqttServerCreator.getSessionManager();
        this.messageStore = mqttServerCreator.getMessageStore();
        this.mqttExecutor = mqttServerCreator.getMqttExecutor();
        this.taskService = timerTaskService;
        MQTT_SERVERS.add(this);
    }

    public static MqttServerCreator create() {
        return new MqttServerCreator();
    }

    public TioServer getTioServer() {
        return this.tioServer;
    }

    public MqttWebServer getWebServer() {
        return this.webServer;
    }

    public TioServerConfig getServerConfig() {
        return this.tioServer.getTioServerConfig();
    }

    public MqttServerCreator getServerCreator() {
        return this.serverCreator;
    }

    public boolean publish(String str, String str2, byte[] bArr) {
        return publish(str, str2, bArr, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publish(String str, String str2, byte[] bArr, MqttQoS mqttQoS) {
        return publish(str, str2, bArr, mqttQoS, false);
    }

    public boolean publish(String str, String str2, byte[] bArr, boolean z) {
        return publish(str, str2, bArr, MqttQoS.AT_MOST_ONCE, z);
    }

    public boolean publish(String str, String str2, byte[] bArr, MqttQoS mqttQoS, boolean z) {
        TopicUtil.validateTopicName(str2);
        if (z) {
            saveRetainMessage(str2, mqttQoS, bArr);
        }
        ChannelContext byBsId = Tio.getByBsId(getServerConfig(), str);
        if (byBsId == null || byBsId.isClosed()) {
            logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", str2, str);
            return false;
        }
        Byte searchSubscribe = this.sessionManager.searchSubscribe(str2, str);
        if (searchSubscribe != null) {
            return publish(byBsId, str, str2, bArr, mqttQoS.value() > searchSubscribe.byteValue() ? MqttQoS.valueOf(searchSubscribe.byteValue()) : mqttQoS, z);
        }
        if (this.serverCreator.isDebug()) {
            logger.info("Mqtt Topic:{} publish but clientId:{} not subscribed.", str2, str);
            return false;
        }
        logger.debug("Mqtt Topic:{} publish but clientId:{} not subscribed.", str2, str);
        return false;
    }

    private boolean publish(ChannelContext channelContext, String str, String str2, byte[] bArr, MqttQoS mqttQoS, boolean z) {
        boolean z2 = MqttQoS.AT_LEAST_ONCE == mqttQoS || MqttQoS.EXACTLY_ONCE == mqttQoS;
        int messageId = z2 ? this.sessionManager.getMessageId(str) : -1;
        MqttPublishMessage build = MqttMessageBuilders.publish().topicName(str2).payload(bArr).qos(mqttQoS).retained(z).messageId(messageId).build();
        boolean send = Tio.send(channelContext, build);
        logger.debug("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", new Object[]{str2, mqttQoS, Boolean.valueOf(z), str, Boolean.valueOf(send)});
        if (z2) {
            MqttPendingPublish mqttPendingPublish = new MqttPendingPublish(bArr, build, mqttQoS);
            this.sessionManager.addPendingPublish(str, messageId, mqttPendingPublish);
            mqttPendingPublish.startPublishRetransmissionTimer(this.taskService, mqttMessage -> {
                Tio.send(channelContext, mqttMessage);
            });
        }
        return send;
    }

    public boolean publishAll(String str, byte[] bArr) {
        return publishAll(str, bArr, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publishAll(String str, byte[] bArr, MqttQoS mqttQoS) {
        return publishAll(str, bArr, mqttQoS, false);
    }

    public boolean publishAll(String str, byte[] bArr, boolean z) {
        return publishAll(str, bArr, MqttQoS.AT_MOST_ONCE, z);
    }

    public boolean publishAll(String str, byte[] bArr, MqttQoS mqttQoS, boolean z) {
        TopicUtil.validateTopicName(str);
        if (z) {
            saveRetainMessage(str, mqttQoS, bArr);
        }
        List<Subscribe> searchSubscribe = this.sessionManager.searchSubscribe(str);
        if (searchSubscribe.isEmpty()) {
            logger.debug("Mqtt Topic:{} publishAll but subscribe client list is empty.", str);
            return false;
        }
        for (Subscribe subscribe : searchSubscribe) {
            String clientId = subscribe.getClientId();
            ChannelContext byBsId = Tio.getByBsId(getServerConfig(), clientId);
            if (byBsId == null || byBsId.isClosed()) {
                logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", str, clientId);
            } else {
                int mqttQoS2 = subscribe.getMqttQoS();
                publish(byBsId, clientId, str, bArr, mqttQoS.value() > mqttQoS2 ? MqttQoS.valueOf(mqttQoS2) : mqttQoS, false);
            }
        }
        return true;
    }

    public ClientInfo getClientInfo(String str) {
        ChannelContext byBsId = Tio.getByBsId(getServerConfig(), str);
        if (byBsId == null) {
            return null;
        }
        return ClientInfo.form(this.serverCreator, byBsId, ClientInfo::new);
    }

    public ClientInfo getClientInfo(ChannelContext channelContext) {
        return ClientInfo.form(this.serverCreator, channelContext, ClientInfo::new);
    }

    public List<ClientInfo> getClients() {
        return getClients(this.serverCreator, (TioConfig) getServerConfig());
    }

    public static List<ClientInfo> getClients(MqttServerCreator mqttServerCreator, TioConfig tioConfig) {
        return (List) Tio.getAll(tioConfig).stream().filter(MqttServer::isMqtt).map(channelContext -> {
            return ClientInfo.form(mqttServerCreator, channelContext, ClientInfo::new);
        }).collect(Collectors.toList());
    }

    public StatVo getStat() {
        return this.tioServer.getTioServerConfig().getStat();
    }

    private static boolean isMqtt(ChannelContext channelContext) {
        return StrUtil.isNotBlank(channelContext.getBsId());
    }

    public Page<ClientInfo> getClients(Integer num, Integer num2) {
        return getClients(this.serverCreator, getServerConfig(), num, num2);
    }

    public static Page<ClientInfo> getClients(MqttServerCreator mqttServerCreator, TioConfig tioConfig, Integer num, Integer num2) {
        return PageUtils.fromSet((Set) Tio.getAll(tioConfig).stream().filter(MqttServer::isMqtt).collect(Collectors.toSet()), num.intValue(), num2.intValue(), channelContext -> {
            return ClientInfo.form(mqttServerCreator, channelContext, ClientInfo::new);
        });
    }

    public boolean sendToClient(String str, Message message) {
        String clientId = message.getClientId();
        MqttQoS valueOf = MqttQoS.valueOf(message.getQos());
        return StrUtil.isBlank(clientId) ? publishAll(str, message.getPayload(), valueOf, message.isRetain()) : publish(clientId, str, message.getPayload(), valueOf, message.isRetain());
    }

    private void saveRetainMessage(String str, MqttQoS mqttQoS, byte[] bArr) {
        Message message = new Message();
        message.setTopic(str);
        message.setQos(mqttQoS.value());
        message.setPayload(bArr);
        message.setMessageType(MessageType.DOWN_STREAM);
        message.setRetain(true);
        message.setDup(false);
        message.setTimestamp(System.currentTimeMillis());
        message.setNode(this.serverCreator.getNodeName());
        this.messageStore.addRetainMessage(str, message);
    }

    public TimerTask schedule(Runnable runnable, long j) {
        return schedule(runnable, j, null);
    }

    public TimerTask schedule(Runnable runnable, long j, Executor executor) {
        return this.serverCreator.getTaskService().addTask(systemTimer -> {
            return new TimerTask(j) { // from class: net.dreamlu.iot.mqtt.core.server.MqttServer.1
                public void run() {
                    try {
                        systemTimer.add(this);
                        if (executor == null) {
                            runnable.run();
                        } else {
                            executor.execute(runnable);
                        }
                    } catch (Exception e) {
                        MqttServer.logger.error("Mqtt server schedule error", e);
                    }
                }
            };
        });
    }

    public TimerTask scheduleOnce(Runnable runnable, long j) {
        return scheduleOnce(runnable, j, null);
    }

    public TimerTask scheduleOnce(Runnable runnable, long j, Executor executor) {
        return this.serverCreator.getTaskService().addTask(systemTimer -> {
            return new TimerTask(j) { // from class: net.dreamlu.iot.mqtt.core.server.MqttServer.2
                public void run() {
                    try {
                        if (executor == null) {
                            runnable.run();
                        } else {
                            executor.execute(runnable);
                        }
                    } catch (Exception e) {
                        MqttServer.logger.error("Mqtt server schedule once error", e);
                    }
                }
            };
        });
    }

    public boolean disconnect(String str) {
        return disconnect(getChannelContext(str), str);
    }

    private boolean disconnect(ChannelContext channelContext, String str) {
        if (channelContext == null || str == null) {
            logger.error("Mqtt server disconnect channelContext:{} or clientId:{} is null.", channelContext, str);
            return false;
        }
        MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(channelContext);
        if (MqttVersion.MQTT_5 != mqttVersion) {
            logger.error("Mqtt server disconnect clientId:{} mqtt version:{} not support.", str, mqttVersion);
            return false;
        }
        boolean bSend = Tio.bSend(channelContext, MqttMessage.DISCONNECT);
        if (bSend) {
            channelContext.setBizStatus(true);
            Tio.remove(channelContext, "Mqtt DisConnect");
        }
        return bSend;
    }

    public ChannelContext getChannelContext(String str) {
        return Tio.getByBsId(getServerConfig(), str);
    }

    public static MqttServer getMqttServer(ChannelContext channelContext) {
        return (MqttServer) channelContext.computeIfAbsent("_M_S_", str -> {
            return MQTT_SERVERS.stream().filter(mqttServer -> {
                return mqttServer.tioServer.getTioServerConfig().equals(channelContext.getTioConfig()) || mqttServer.webServer.getServerTioConfig().equals(channelContext.getTioConfig());
            }).findFirst().orElseThrow(() -> {
                return new IllegalStateException("Can not found MqttServer.");
            });
        });
    }

    public void close(String str) {
        Tio.remove(getChannelContext(str), "Mqtt server close this connects.");
    }

    public boolean start() {
        this.taskService.start();
        try {
            this.tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
            if (this.webServer == null) {
                return true;
            }
            try {
                this.webServer.start();
                return true;
            } catch (IOException e) {
                throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e);
            }
        } catch (IOException e2) {
            throw new IllegalStateException("Mica mqtt tcp server start fail.", e2);
        }
    }

    public boolean stop() {
        this.taskService.stop();
        boolean stop = this.tioServer.stop();
        logger.info("Mqtt tcp server stop result:{}", Boolean.valueOf(stop));
        if (this.webServer != null) {
            stop &= this.webServer.stop();
            logger.info("Mqtt websocket server stop result:{}", Boolean.valueOf(stop));
        }
        try {
            this.mqttExecutor.shutdown();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        try {
            stop &= this.mqttExecutor.awaitTermination(10L, TimeUnit.MINUTES);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            logger.error(e2.getMessage(), e2);
        }
        try {
            this.sessionManager.clean();
        } catch (Throwable th) {
            logger.error("MqttServer stop session clean error.", th);
        }
        return stop;
    }
}
