package cool.doudou.doudada.mqtt.config;

import cool.doudou.doudada.mqtt.core.callback.MqttSubscribeCallback;
import cool.doudou.doudada.mqtt.core.factory.CallbackMapFactory;
import cool.doudou.doudada.mqtt.core.factory.MethodMapFactory;
import cool.doudou.doudada.mqtt.core.method.SubscribeMethod;
import cool.doudou.doudada.mqtt.properties.MqttProperties;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:cool/doudou/doudada/mqtt/config/MqttMessageConfig.class */
public class MqttMessageConfig {
    private static final Logger log = LoggerFactory.getLogger(MqttMessageConfig.class);
    private MqttProperties mqttProperties;

    @Bean
    public MessageProducer mqttInbound(MqttPahoClientFactory mqttPahoClientFactory, MessageChannel messageChannel, MessageChannel messageChannel2) {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(this.mqttProperties.getClientId() + "_in", mqttPahoClientFactory, this.mqttProperties.getTopics());
        mqttPahoMessageDrivenChannelAdapter.setCompletionTimeout(5000L);
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        defaultPahoMessageConverter.setPayloadAsBytes(true);
        mqttPahoMessageDrivenChannelAdapter.setConverter(defaultPahoMessageConverter);
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageChannel);
        mqttPahoMessageDrivenChannelAdapter.setErrorChannel(messageChannel2);
        mqttPahoMessageDrivenChannelAdapter.setQos(new int[]{this.mqttProperties.getQos().intValue()});
        return mqttPahoMessageDrivenChannelAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handler() {
        return message -> {
            String str = null;
            MessageHeaders headers = message.getHeaders();
            if (!headers.isEmpty()) {
                str = String.valueOf(headers.get("mqtt_receivedTopic"));
            }
            if (str == null) {
                log.error("handler: topic is null");
                return;
            }
            byte[] bArr = (byte[]) message.getPayload();
            MqttSubscribeCallback mqttSubscribeCallback = CallbackMapFactory.get(str);
            if (mqttSubscribeCallback != null) {
                mqttSubscribeCallback.messageArrived(str, bArr);
                return;
            }
            SubscribeMethod subscribeMethod = MethodMapFactory.get(str);
            if (subscribeMethod == null) {
                log.warn("topic[{}]: No handler or method found", str);
                return;
            }
            Object bean = subscribeMethod.getBean();
            Method method = subscribeMethod.getMethod();
            try {
                method.setAccessible(true);
                method.invoke(bean, bArr);
            } catch (Exception e) {
                log.error("bean[{}].method[{}] invoke exception: ", new Object[]{bean, method.getName(), e});
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttErrorChannel")
    public MessageHandler errorHandler() {
        return message -> {
            log.error("errorHandler: {}", message);
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(this.mqttProperties.getClientId() + "_out", mqttPahoClientFactory);
        mqttPahoMessageHandler.setAsync(true);
        mqttPahoMessageHandler.setDefaultTopic(this.mqttProperties.getTopics()[0]);
        return mqttPahoMessageHandler;
    }

    public MqttMessageConfig(MqttProperties mqttProperties) {
        this.mqttProperties = mqttProperties;
    }
}
