package org.apache.iotdb.db.service;

import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/service/MQTTService.class */
public class MQTTService implements IService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MQTTService.class);
    private final Server server;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/service/MQTTService$MQTTServiceHolder.class */
    public static class MQTTServiceHolder {
        private static final MQTTService INSTANCE = new MQTTService();

        private MQTTServiceHolder() {
        }
    }

    private MQTTService() {
        this.server = new Server();
    }

    @Override // org.apache.iotdb.commons.service.IService
    public void start() {
        startup();
    }

    @Override // org.apache.iotdb.commons.service.IService
    public void stop() {
        shutdown();
    }

    public void startup() {
        IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
        IConfig createBrokerConfig = createBrokerConfig(config);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(new MPPPublishHandler(config));
        this.server.startServer(createBrokerConfig, arrayList, null, new BrokerAuthenticator(), null);
        LOG.info("Start MQTT service successfully, listening on ip {} port {}", config.getMqttHost(), Integer.valueOf(config.getMqttPort()));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOG.info("Stopping IoTDB MQTT service...");
            shutdown();
            LOG.info("IoTDB MQTT service stopped.");
        }));
    }

    private IConfig createBrokerConfig(IoTDBConfig ioTDBConfig) {
        Properties properties = new Properties();
        properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, ioTDBConfig.getMqttHost());
        properties.setProperty("port", String.valueOf(ioTDBConfig.getMqttPort()));
        properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, String.valueOf(ioTDBConfig.getMqttHandlerPoolSize()));
        properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, SQLConstant.BOOLEAN_TRUE);
        properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, SQLConstant.BOOLEAN_FALSE);
        properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, SQLConstant.BOOLEAN_TRUE);
        properties.setProperty(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(ioTDBConfig.getMqttMaxMessageSize()));
        return new MemoryConfig(properties);
    }

    public void shutdown() {
        this.server.stopServer();
    }

    @Override // org.apache.iotdb.commons.service.IService
    public ServiceType getID() {
        return ServiceType.MQTT_SERVICE;
    }

    public static MQTTService getInstance() {
        return MQTTServiceHolder.INSTANCE;
    }
}
