/*
 * Decompiled with CFR 0.152.
 */
package cn.herodotus.stirrup.transform.emqx.config;

import cn.herodotus.stirrup.message.mqtt.annotation.ConditionalOnMqttEnabled;
import cn.herodotus.stirrup.message.mqtt.config.MessageMqttConfiguration;
import cn.herodotus.stirrup.transform.emqx.annotation.ConditionalOnEmqxSystemTopicEvent;
import cn.herodotus.stirrup.transform.emqx.transformer.SystemClientByteArrayToEventTransformer;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.core.GenericTransformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.event.outbound.ApplicationEventPublishingMessageHandler;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration(proxyBeanMethods=false)
@ConditionalOnMqttEnabled
@ConditionalOnEmqxSystemTopicEvent
@Import(value={MessageMqttConfiguration.class})
public class EmqxSystemClientToEventFlowConfiguration {
    private static final Logger log = LoggerFactory.getLogger(EmqxSystemClientToEventFlowConfiguration.class);
    private static final String[] EMQX_MONITOR_TOPICS = new String[]{"$SYS/brokers/+/clients/#"};

    @PostConstruct
    public void postConstruct() {
        log.debug("[Herodotus] |- SDK [Emqx $sys/client To Event Flow] Auto Configure.");
    }

    @Bean(value={"emqxDefaultMonitorMqttInboundChannel"})
    public MessageChannel emqxMonitorInboundChannel() {
        return (MessageChannel)MessageChannels.publishSubscribe().getObject();
    }

    @Bean
    public ApplicationEventPublishingMessageHandler emqxSystemClientEventPublishingMessageHandler() {
        ApplicationEventPublishingMessageHandler handler = new ApplicationEventPublishingMessageHandler();
        handler.setPublishPayload(true);
        return handler;
    }

    @Bean
    public IntegrationFlow emqxSystemClientToEventFlow(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager, ApplicationEventPublishingMessageHandler emqxSystemClientEventPublishingMessageHandler, @Qualifier(value="emqxDefaultMonitorMqttInboundChannel") MessageChannel emqxMonitorInboundChannel) {
        Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, EMQX_MONITOR_TOPICS);
        messageProducer.setPayloadType(String.class);
        messageProducer.setManualAcks(false);
        messageProducer.setOutputChannel(emqxMonitorInboundChannel);
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlow.from((MessageProducerSupport)messageProducer).transform((GenericTransformer)new SystemClientByteArrayToEventTransformer())).channel((MessageChannelSpec)MessageChannels.direct((String)"emqxDefaultEventOutboundChannel"))).handle((MessageHandler)emqxSystemClientEventPublishingMessageHandler)).get();
    }
}

